Python多线程
Threading模块用于提供线程相关的操作,线程是应用程序中工作的最小单元。
threading模块提供的类:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
Thread类
Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run():

# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
def run(arg):
time.sleep(2)
print("Thread-%s" % arg, )
if __name__ == '__main__':
for i in range(10):
# 将要执行的方法作为参数传递给Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
方式1:创建线程

Thread-3
Thread-2
Thread-1
Thread-0
Thread-4
Thread-7
Thread-6
Thread-5
Thread-9
Thread-8
Process finished with exit code 0
执行结果

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
# 从Thread继承,并重写run()
class myThread(threading.Thread):
def __init__(self, num):
# #注意:一定要显式的调用父类的初始化函数。
threading.Thread.__init__(self)
self.num = num
# 定义每个线程要运行的函数
def run(self):
time.sleep(1)
print("Thread-%s" % self.num)
if __name__ == '__main__':
for i in range(10):
thread = myThread(i)
thread.start()
方式2:自定义线程类,继承Thread类,并重写run方法

Thread-4
Thread-2
Thread-1
Thread-3
Thread-0
Thread-6
Thread-5
Thread-8
Thread-9
Thread-7
Process finished with exit code 0
运行结果
Thread类的构造方法:
Thread(group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None)
class Thread:
def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):
• group: 线程组,目前还没有实现,库引用中提示必须是None;
• target: 要执行的方法run();
• name: 线程名;
• args/kwargs: 要传入方法的参数args元组,kwargs字典。
• daemon=None:如果子类重写构造函数,则必须确保在对线程执行其他操作之前调用基类构造函数
Thread实例方法:
• is_alive(): 返回线程是否在运行[bool]。正在运行指启动后、终止前。
• getName(): 获取线程名。
• setName(str):设置线程名
• start(): 线程准备就绪,启动线程,等待CPU调度
• isDaemon():返回线程前台线程还是后台线程[bool]
• setDaemon(bool):设置线程前台线程还是后台线程[bool],必须在start前设置
如果是后台线程[True],主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
如果是前台线程[False]默认,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
• join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数),逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Created by YangYongming at 2018/11/24 14:07
# FileName: 5.py
import threading
def run(arg):
print(arg)
thread = threading.Thread(name="yangym", target=run(1), )
print(thread.getName()) # 输出 yangym
thread.setDaemon(False)
thread.setName("YANGYM")
print(thread.getName()) # 输出 YANGYM
thread.start()
print(thread.is_alive()) # 输出 False
print(thread.isDaemon()) # 输出 False
实例演示
setDaemon 使用介绍
setDaemon为True时,主线程不等待其他线程的执行;setDaemon为False时(默认),主线程等待所有其他线程执行完成后再继续往下执行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
# 从Thread继承,并重写run()
class myThread(threading.Thread):
def __init__(self, num):
# #注意:一定要显式的调用父类的初始化函数。
threading.Thread.__init__(self)
self.num = num
# 定义每个线程要运行的函数
def run(self):
time.sleep(1)
print("Thread-%s" % self.num)
if __name__ == '__main__':
for i in range(10):
thread = myThread(i)
thread.setDaemon(True)
thread.start()
setDaemon为True

Process finished with exit code 0
运行结果:主程序执行完毕后立即退出

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
# 从Thread继承,并重写run()
class myThread(threading.Thread):
def __init__(self, num):
# #注意:一定要显式的调用父类的初始化函数。
threading.Thread.__init__(self)
self.num = num
# 定义每个线程要运行的函数
def run(self):
time.sleep(1)
print("Thread-%s" % self.num)
if __name__ == '__main__':
for i in range(10):
thread = myThread(i)
thread.setDaemon(False) #默认
thread.start()
setDaemon为False

Thread-2
Thread-3
Thread-1
Thread-0
Thread-9
Thread-6
Thread-4
Thread-7
Thread-8
Thread-5
Process finished with exit code 0
运行结果:主程序执行完毕后会等待其他所有线程全部结束后在退出程序
join 使用介绍
阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数),逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
def run(arg):
time.sleep(2)
print("Thread-%s Time is: %s" % (arg,time.ctime()) )
if __name__ == '__main__':
for i in range(10):
# 将要执行的方法作为参数传递给Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
thread.join()
使用join,使用join的线程会阻塞其他线程的运行,指导线程结束或者超时,其他线程才可以继续运行

Thread-0 Time is: Sat Mar 10 13:33:24 2018
Thread-1 Time is: Sat Mar 10 13:33:26 2018
Thread-2 Time is: Sat Mar 10 13:33:28 2018
Thread-3 Time is: Sat Mar 10 13:33:30 2018
Thread-4 Time is: Sat Mar 10 13:33:32 2018
Thread-5 Time is: Sat Mar 10 13:33:34 2018
Thread-6 Time is: Sat Mar 10 13:33:36 2018
Thread-7 Time is: Sat Mar 10 13:33:38 2018
Thread-8 Time is: Sat Mar 10 13:33:40 2018
Thread-9 Time is: Sat Mar 10 13:33:42 2018
Process finished with exit code 0
运行结果:每个线程都会等待使用join的线程执行完毕后才继续运行,等待2秒,这使得多线程变得没有意义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
def run(arg):
time.sleep(2)
print("Thread-%s Time is: %s" % (arg,time.ctime()) )
if __name__ == '__main__':
for i in range(10):
# 将要执行的方法作为参数传递给Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
不使用join,线程之前运行互不影响

Thread-4 Time is: Sat Mar 10 13:34:22 2018
Thread-2 Time is: Sat Mar 10 13:34:22 2018
Thread-1 Time is: Sat Mar 10 13:34:22 2018
Thread-0 Time is: Sat Mar 10 13:34:22 2018
Thread-3 Time is: Sat Mar 10 13:34:22 2018
Thread-6 Time is: Sat Mar 10 13:34:22 2018
Thread-5 Time is: Sat Mar 10 13:34:22 2018
Thread-9 Time is: Sat Mar 10 13:34:22 2018
Thread-8 Time is: Sat Mar 10 13:34:22 2018
Thread-7 Time is: Sat Mar 10 13:34:22 2018
Process finished with exit code 0
线程锁(Lock、RLock)
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。
Lock属于全局,Rlock属于线程。Lock(),Rlock(),推荐使用Rlock()
一把锁头,一把钥匙,钥匙用完给下一个人用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
num = 0
def run(arg):
global num
time.sleep(1)
num = num + arg
print(num)
if __name__ == '__main__':
for i in range(10):
# 将要执行的方法作为参数传递给Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
未使用锁,所有线程随机运行,数据混乱

5
8
9
11
15
15
24
32
39
45
Process finished with exit code 0
执行结果,未按照理想的顺序运行

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
lock = threading.RLock()
num = 0
def run(arg):
lock.acquire()
global num
time.sleep(1)
num = num + arg
print(num)
lock.release()
if __name__ == '__main__':
for i in range(10):
# 将要执行的方法作为参数传递给Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
使用锁,设置一条数据在同一时间只可以被一个线程修改

0
1
3
6
10
15
21
28
36
45
Process finished with exit code 0
信号量(Semaphore)
互斥锁Rlock 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
一把锁头,n把钥匙,你把钥匙全部用完,再给下n个人使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
class myThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
semaphore.acquire()
time.sleep(5)
print("Thread-%s Time is: %s" % (self.num,time.ctime()))
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程被同事执行
for i in range(10):
thread = myThread(i)
thread.start()
实例:某条语句设置最多可以5个线程同时执行

Thread-4 Time is: Sat Mar 10 14:07:00 2018
Thread-3 Time is: Sat Mar 10 14:07:00 2018
Thread-2 Time is: Sat Mar 10 14:07:00 2018
Thread-1 Time is: Sat Mar 10 14:07:00 2018
Thread-0 Time is: Sat Mar 10 14:07:00 2018
Thread-9 Time is: Sat Mar 10 14:07:05 2018
Thread-5 Time is: Sat Mar 10 14:07:05 2018
Thread-7 Time is: Sat Mar 10 14:07:05 2018
Thread-6 Time is: Sat Mar 10 14:07:05 2018
Thread-8 Time is: Sat Mar 10 14:07:05 2018
Process finished with exit code 0
# 每次执行5个线程,中间间隔5秒
运行结果:看时间
事件(event)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False 默认的,当Flag为False,无论线程执行到哪里,遇到wait后全部阻塞
- set:将“Flag”设置为True 无论在哪里,遇到set后所有线程正常执行

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
class myThread(threading.Thread):
def __init__(self, num, event):
threading.Thread.__init__(self)
self.num = num
self.event = event
def run(self):
print("Thread-%s" % self.num)
self.event.wait() # 所有线程到此将被阻塞
print(self.num)
if __name__ == '__main__':
event_obj = threading.Event()
for i in range(10):
thread = myThread(i, event_obj)
thread.start()
inp = input("please input:") # 输入true时,所有被阻塞线程恢复执行
if inp == "true":
event_obj.set()
实例

Thread-0
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
please input:true
0
2
3
1
5
7
8
9
4
6
Process finished with exit code 0
Queue类
Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。
这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。
Queue 模块中的常用方法:
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False
- Queue.full 与 maxsize 大小对应
- Queue.get([block[, timeout]])获取队列,timeout等待时间
- Queue.get_nowait() 相当Queue.get(False)
- Queue.put(item) 写入队列,timeout等待时间
- Queue.put_nowait(item) 相当Queue.put(item, False)
- Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
- Queue.join() 实际上意味着等到队列为空,再执行别的操作

#!/usr/bin/python3
import queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("开启线程:" + self.name)
process_data(self.name, self.q)
print ("退出线程:" + self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print ("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1
# 创建新线程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 填充队列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待队列清空
while not workQueue.empty():
pass
# 通知线程是时候退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")

开启线程:Thread-1
开启线程:Thread-2
开启线程:Thread-3
Thread-3 processing One
Thread-3 processing Two
Thread-1 processing Three
Thread-2 processing Four
Thread-1 processing Five
退出线程:Thread-2
退出线程:Thread-1
退出线程:Thread-3
退出主线程
简单应用
多线程ping操作,
ping.py:主程序
sfile:IP地址段列表文件
dfile:ping通的地址保存到该文件中
sfile 文件:存放IP网段列表,每行一个,内容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Created by YangYongming at 2018/11/25 12:26
# FileName: ping.py
import threading
import IPy
import os
import queue
class MyThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self): # 定义每个线程要运行的函数
global aliveip
while True:
host = self.queue.get(timeout=2) # 从队列中取Ip
host = str(host)
res = os.popen("ping -n 1 %s" % host)
os.popen("exit\n")
if "TTL" in res.read():
print("%s is Alive" % host)
lock.acquire()
with open("dfile", "a", encoding="utf-8") as f2:
f2.write(host + '\n')
aliveip.append(host)
lock.release()
self.queue.task_done()
if self.queue.empty(): # 当队列为空时,终止该线程
break
if __name__ == '__main__':
threadlist = []
lock = threading.RLock()
queue = queue.Queue(50) # 初始化一个队列, 容量50
aliveip = []
for i in range(40): # 建立40个线程
t = MyThread(queue)
t.setDaemon(False) # 主线程执行完成,等待所有的前台线程执行完毕,默认[等待]False
t.start() # 线程准备就绪,等待CPU调度
with open("sfile", "r", encoding="utf-8") as f1:
for line in f1.readlines():
ip = IPy.IP(line)
for x in ip:
queue.put(x) # 向队列里面放IP
queue.join()
主程序
输出结果:
将可以ping通的地址保存到dfile文件中,内容如下:
作者:杨永明
出处:https://www.cnblogs.com/ming5218/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。