本文是笔者学习python多线程和多进程的笔记。
Python多线程
单线程执行sleep
在
onethr.py
文件中创建两个时间循环:一个睡眠4s(loop0);一个睡眠2s(loop1)(这里使用"loop0"和"loop1"作为函数名)。以下多线程的讲解示例将以此为基础。代码如下:
# onethy.py
from time import sleep, ctime
def loop0():
print("Start loop 0 at:", ctime())
sleep(4)
print("Loop 0 done at:", ctime())
def loop1():
print("Start loop 1 at:", ctime())
sleep(2)
print("Loop 1 done at:", ctime())
def main():
print("Start at:", ctime())
loop0()
loop1()
print("All done at:", ctime())
if __name__ == '__main__':
main()
执行结果如下:
Start at: Tue Oct 29 09:39:25 2019
Start loop 0 at: Tue Oct 29 09:39:25 2019
Loop 0 done at: Tue Oct 29 09:39:29 2019
Start loop 1 at: Tue Oct 29 09:39:29 2019
Loop 1 done at: Tue Oct 29 09:39:31 2019
All done at: Tue Oct 29 09:39:31 2019
多线程编程
Python支持多线程编程的模块:
thread模块:提供了基本的线程和锁定支持;
threading模块:提供了更高级别、功能更全面的线程管理;
Queue模块:用于创建一个队列数据结构,在多线程之间进行共享。
建议:推荐使用更先进的threading模块,而不是thread模块。
注意:在python3中,thread模块被重命名为_thread模块。
thread模块
下表是一些thread模块常用的线程函数和LockType锁对象的方法。
thread模块的函数 | 功能 |
---|---|
start_new_thread(function, args, kwargs=None) | 派生一个新的线程,使用给定的args和可选的kwargs来执行funciton |
allocate_lock() | 分配LockType所对象 |
exit() | 给线程退出指令 |
LockType锁对象的方法 | 功能 |
---|---|
acquire(wait=None) | 尝试获取锁对象 |
locked() | 如果获取了锁对象则返回True,否则,返回False |
release() | 释放锁 |
- 使用thread模块的多线程机制将
改写为onethr.py
,代码如下:mtsleepA.py
# mtsleepA.py
import _thread
from time import sleep, ctime
def loop0():
print("Start loop 0 at:", ctime())
sleep(4)
print("Loop 0 done at:", ctime())
def loop1():
print("Start loop 1 at:", ctime())
sleep(2)
print("Loop 1 done at:", ctime())
def main():
print("Start at:", ctime())
_thread.start_new_thread(loop0, ())
_thread.start_new_thread(loop1, ()) # 第二个参数args是一个tuple,没有默认值,所以即使function没有参数,也要传入一个空tuple
sleep(6)
print("All done at:", ctime())
if __name__ == '__main__':
main()
执行结果如下:
Start at: Tue Oct 29 10:16:05 2019
Start loop 0 at: Tue Oct 29 10:16:05 2019
Start loop 1 at: Tue Oct 29 10:16:05 2019
Loop 1 done at: Tue Oct 29 10:16:07 2019
Loop 0 done at: Tue Oct 29 10:16:09 2019
All done at: Tue Oct 29 10:16:11 2019
可以看出loop0和loop1几乎同时开始执行,且两者是并行执行的,整体的运行时间是4s。
代码中第20行的
sleep(6)
的调用是非常有用的。如果没有20行,执行了18和19行后,直接显示"All done",然后主线程结束退出,那么子线程loop0()和loop1()也会直接终止。
这里我们知道这个程序最多执行6s,但是实际上只需要4s,并没有比单线程节省时间,而且大部分情况我们是不知道需要执行多久,那么就没办法使用sleep()来进行线程的同步机制了。这是就需要锁了。
2. 使用thread模块的锁对象,将
mtsleepA.py
改写为
mtsleepB.py
,代码如下:
# mtsleepB.py
import _thread
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec, lock):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
lock.release()
def main():
print("Start at:", ctime())
locks = []
nloops = range(len(loops))
for i in nloops:
lock = _thread.allocate_lock()
lock.acquire()
locks.append(lock)
for i in nloops:
_thread.start_new_thread(loop, (i, loops[i], locks[i]))
for i in nloops:
while locks[i].locked():
pass
print("All done at:", ctime())
if __name__ == '__main__':
main()
下面逐行对代码进行解释:
第1~6行
- 导入模块
- 不再对4s和2s硬编码到不同函数中,而是使用了唯一的loop函数,并把这些常量放入列表loops列表中。
第8~12行
- loop()函数代替了之前的loop*()函数。
- 给loop()分配一个已获得的锁,当执行完后释放锁,向主线程表明该子线程已完成。
第14~34行
3. 第一个for循环创建锁的列表,开启多少个子线程就分配多少个锁对象(20行),然后通过
acquire()
方法取得(21行)(取得锁可以理解为“把锁锁上”),并把锁放入列表。
4. 第二个for循环用于派生线程,每个线程都调用loop()函数,并传递号码、睡眠时间以及用于该线程的锁。
5. 第三个for循环用于检查锁列表,看列表里面的锁是否全部释放。只有在释放后才会继续向后执行。
执行结果如下:
Start at: Tue Oct 29 14:32:03 2019
Start loop 0 at: Tue Oct 29 14:32:03 2019
Start loop 1 at: Tue Oct 29 14:32:03 2019
Loop 1 done at: Tue Oct 29 14:32:05 2019
Loop 0 done at: Tue Oct 29 14:32:07 2019
All done at: Tue Oct 29 14:32:07 2019
threading模块
下表是threading模块中的所有可用对象:
对象 | 描述 |
---|---|
Thread | 表示一个执行线程的对象 |
Lock | 锁原语对象(和thread模块中的锁一样) |
RLock | 可重入锁对象,使得单一线程可以(再次)获得已持有的锁(递归锁) |
Condition | 条件变量对象,使得一个线程等待另一个线程满足特定的‘条件’,比如改变状态或某个数据值 |
Event | 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活 |
Semaphore | 为线程间共享的有限资源提供了一个“计数器”,如果没有可用资源时会被阻塞 |
BoundedSemaphore | 与Semaphore相似,不过它不允许超过初始值 |
Timer | 与Thread相似,不过他要在运行前等待一段时间 |
Barrier | 创建一个“障碍”,必须达到指定数量的线程后才可以继续 |
守护线程:守护线程一般是等待客户端请求服务的服务器,无请求时什么都不做,所以默认守护线程是不重要的,在关闭主线程时默认直接关闭守护线程。所以如果主线程在准备退出时,不需要等待一些子线程完成,就可以为这些子线程设置守护线程标记,该标记为真,表示该线程不重要。设置方法是:
thread.daemon=True
(调用
thread.setDamon(True)
的旧方法已经弃用)。检查一个线程是否的守护状态也是检查这个属性(调用
thread.isDaemon()
的旧方法已经弃用)。
整个python程序(可以解读为:主线程)将在所有非守护线程退出后才退出。
-
Thread类
Thread类是threading模块的主要执行对象。她有thread模块中没有的很多函数。
下表给出了Thread对象的属性和方法:
属性 | 描述 |
---|---|
name | 线程名 |
ident | 线程的标识符 |
daemon | 守护线程标记 |
方法 | 描述 |
---|---|
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) | 初始化方法,实例化一个线程对象,需要有一个可调用的target,以及其参数args或/和kwargs。还可以传递name和group参数(后者还未实现)。此外。而daemon的值将会设定`thread.daemon属性/标志 |
start() | 开始执行该线程 |
run() | 定义该线程功能的方法(通常在子类中被重写) |
join(timeout=None) | 直至启动的线程终止之前一直被挂起;除非给出了timeout(秒),否则会一直阻塞 |
getName() | 返回线程名(已弃用,可直接调用thread.name属性) |
setName(name) | 设定线程名(已弃用,可直接修改thread.name属性 |
is_alive() | 返回布尔值,表示这个进程是否还存活 |
isDaemon() | 如果是守护进程,返回True;否则,返回False(已弃用) |
setDaemon(daemonic) | 把线程的守护标志设置为布尔值daemonic(必须在线程start()之前调用)(已弃用) |
下面从三个方面说明如何使用Thread类:
- 创建Thread的实例,传给它一个函数。
- 创建Thread的实例,传给它一个可调用的类实例。
-
派生Thread的子类,并创建子类的实例。
最推荐最后一种方案。最不推荐第二种。
创建Thread的实例,传给它一个函数
把
mtsleepB.py
改写为
mtsleepC.py
,添加使用Thread类,代码如下:
# mtsleepC.py
import threading
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
def main():
print("Start at:", ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop, args=(i, loops[i]))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print("All done at:", ctime())
if __name__ == '__main__':
main()
执行结果如下:
Start at: Tue Oct 29 15:50:11 2019
Start loop 0 at: Tue Oct 29 15:50:11 2019
Start loop 1 at: Tue Oct 29 15:50:11 2019
Loop 1 done at: Tue Oct 29 15:50:13 2019
Loop 0 done at: Tue Oct 29 15:50:15 2019
All done at: Tue Oct 29 15:50:15 2019
- 使用thread模块时实现的锁没有了,取而代之的是一组Thread对象。
- 实例化Thread对象(调用
)和调用Thread()
的最大区别是新县城不会立即开始执行,便于线程的同步。当所有线程分配完成后,再调用每个线程的thread.start_new_thread()
方法让线程开始执行。start()
- 相对于管理一组锁(分配、获取、释放、检查锁状态等),这里只需要为每个线程调用
方法即可。join()
方法将等待线程结束,或者在提供了超时时间的情况下,大道超时时间。join()
- 实际上
方法根本不需要调用,一旦线程启动,它们就会一直执行,直到给定的函数完成后后退出。如果主线程还有其他工作要做,就可以不调用join()
。join()
方法只有在你需要等待线程完成的时候才有用。如果把第26~27行注释掉,执行结果如下:join()
Start at: Tue Oct 29 15:52:42 2019
Start loop 0 at: Tue Oct 29 15:52:42 2019
Start loop 1 at: Tue Oct 29 15:52:42 2019
All done at: Tue Oct 29 15:52:42 2019
Loop 1 done at: Tue Oct 29 15:52:44 2019
Loop 0 done at: Tue Oct 29 15:52:46 2019
从结果可看出
join()
的用法。
创建Thread的实例,传给它一个可调用的类实例
在
mtsleepC.py
的基础上新增加一个新类ThreadFunc,并进行一些其他的轻微改动,得到
mtsleepD.py
,代码如下:
# mtsleepD.py
import threading
from time import sleep, ctime
loops = [4, 2]
class ThreadFunc():
'''可调用类'''
def __init__(self, func, args, name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
self.func(*self.args)
def loop(nloop, nsec):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
def main():
print("Start at:", ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print("All done at:", ctime())
if __name__ == '__main__':
main()
执行结果如下:
Start at: Tue Oct 29 16:11:40 2019
Start loop 0 at: Tue Oct 29 16:11:40 2019
Start loop 1 at: Tue Oct 29 16:11:40 2019
Loop 1 done at: Tue Oct 29 16:11:42 2019
Loop 0 done at: Tue Oct 29 16:11:44 2019
All done at: Tue Oct 29 16:11:44 2019
派生Thread的子类,并创建子类的实例
相对于创建可调用类的例子,当创建线程时使用子类要相对更容易阅读。下面是
mtsleepE.py
的代码:
# mtsleepE.py
import threading
from time import sleep, ctime
loops = [4, 2]
class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def loop(nloop, nsec):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
def main():
print("Start at:", ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop, (i, loops[i]), loop.__name__)
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print("All done at:", ctime())
if __name__ == '__main__':
main()
执行结果:
Start at: Tue Oct 29 16:21:04 2019
Start loop 0 at: Tue Oct 29 16:21:04 2019
Start loop 1 at: Tue Oct 29 16:21:04 2019
Loop 1 done at: Tue Oct 29 16:21:06 2019
Loop 0 done at: Tue Oct 29 16:21:08 2019
All done at: Tue Oct 29 16:21:08 2019
如果执行对象有返回值,可在MyThread类中增加
get_result()
方法来取得返回值,这是MyThread类的代码如下:
import threading
class MyThread(threading.Thread):
def __init__(self, func, args, name='')
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception as e:
return None
-
threading模块的其他函数
除了各种同步和线程对象外,threading模块还提供了一些函数,如下表:
函数 | 描述 |
---|---|
active_count() | 当前活动的Thread对象个数 |
current_count() | 返回当前的Thread对象 |
enumerate() | 返回当前活动的Thread对象列表 |
settrace(func) | 为所有线程设置一个trace函数 |
setprofile(func) | 为所有线程设置一个profile函数 |
stack_size(size=0) | 返回新创建线程的栈大小;或为后续创建的线程设定栈的大小为size |
同步原语
一般在多线程代码中,总会有一些特定的函数或代码块不希望(或不应该)被多个线程同时执行,通常包括修改数据库、更新文件或其他会产生竞态条件的类似情况。这就是需要使用同步的情况。这里介绍两种类型的同步原语:锁/互斥;信号量。
-
锁示例
锁有两种状态:锁定和未锁定。
只支持两种函数:获得锁和释放锁。
下面是
没有使用锁的代码:mtsleepF.py
# mtsleepF.py
from atexit import register
from random import randrange
from threading import Thread, currentThread
from time import sleep, ctime
class CleanOutputSet(set):
def __str__(self):
return ','.join(x for x in self)
loops = (randrange(2, 5) for x in range(randrange(3, 20))) # 将列表生成式的[]改为(),就创建了一个generator。 randrange(3, 7)表示创建3~6个进程,randrange(2,5)表示为每一个进程选择一个睡眠时间
remaining = CleanOutputSet()
def loop(nsec):
myname = currentThread().name
remaining.add(myname)
print('[%s] started %s' % (ctime(), myname))
sleep(nsec)
remaining.remove(myname)
print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'None'))
def main():
for pause in loops:
Thread(target=loop, args=(pause, )).start() # args=(pause, )中的','必不可少,如果没有,那么args就会被当做int
@register
def _atexit():
print('all Done at:', ctime())
if __name__ == '__main__':
main()
我们来解释上述代码:
第1~6行
- atexit模块及其
函数用法参考https://blog.csdn.net/qyhaill/article/details/102807732register()
- random模块的
函数用法参考https://www.runoob.com/python/func-number-randrange.htmlrandrange()
- threading.currentThread()返回当前线程变量,这是旧的写法,新的写法是threading.current_thread()
第8~10行
- 派生python的set类(set类介绍百度很多),实现了它的
方法__str__()
-
函数用法参考https://www.runoob.com/python/att-string-join.htmljoin()
第12~22行
- loops是一个generator。将列表生成式的[]改为(),就创建了一个generator
- remaining是set类的派生类的对象,用来记录剩下的还在运行的线程
- 对
函数进行一些修改,在sleep完之后将remaining中剩下的还在执行的线程显示出来loop()
第24~30行
- main()函数创建并开始执行线程
- 28~30行的意思参见atexit模块的用法
幸运的话,执行结果会按适当的顺序给出:
[Tue Oct 29 22:53:28 2019] started Thread-1
[Tue Oct 29 22:53:28 2019] started Thread-2
[Tue Oct 29 22:53:28 2019] started Thread-3
[Tue Oct 29 22:53:28 2019] started Thread-4
[Tue Oct 29 22:53:28 2019] started Thread-5
[Tue Oct 29 22:53:28 2019] started Thread-6
[Tue Oct 29 22:53:28 2019] started Thread-7
[Tue Oct 29 22:53:30 2019] completed Thread-2 (2 secs)
(remaining: Thread-7,Thread-6,Thread-4,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-4 (2 secs)
(remaining: Thread-7,Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-7 (2 secs)
(remaining: Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:31 2019] completed Thread-3 (3 secs)
(remaining: Thread-6,Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-6 (4 secs)
(remaining: Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-5 (4 secs)
(remaining: Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-1 (4 secs)
(remaining: None)
all Done at: Tue Oct 29 22:53:32 2019
不幸运的话,会得到下面这样奇怪的输出:
[Tue Oct 29 22:53:22 2019] started Thread-1
[Tue Oct 29 22:53:22 2019] started Thread-2
[Tue Oct 29 22:53:22 2019] started Thread-3
[Tue Oct 29 22:53:22 2019] started Thread-4
[Tue Oct 29 22:53:22 2019] started Thread-5
[Tue Oct 29 22:53:22 2019] started Thread-6
[Tue Oct 29 22:53:22 2019] started Thread-7
[Tue Oct 29 22:53:22 2019] started Thread-8
[Tue Oct 29 22:53:22 2019] started Thread-9
[Tue Oct 29 22:53:22 2019] started Thread-10
[Tue Oct 29 22:53:22 2019] started Thread-11
[Tue Oct 29 22:53:22 2019] started Thread-12
[Tue Oct 29 22:53:22 2019] started Thread-13
[Tue Oct 29 22:53:22 2019] started Thread-14
[Tue Oct 29 22:53:22 2019] started Thread-15
[Tue Oct 29 22:53:22 2019] started Thread-16
[Tue Oct 29 22:53:24 2019] completed Thread-3 (2 secs)
(remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7,Thread-2)
[Tue Oct 29 22:53:24 2019] completed Thread-2 (2 secs)
(remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-8 (2 secs)
(remaining: Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-10 (2 secs)
(remaining: Thread-6,Thread-9,Thread-14,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:25 2019] completed Thread-11 (3 secs)
(remaining: Thread-6,Thread-9,Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:26 2019] completed Thread-6 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-7 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-4 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-9 (4 secs)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
[Tue Oct 29 22:53:26 2019] completed Thread-1 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-5 (4 secs)
(remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
(remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
[Tue Oct 29 22:53:26 2019] completed Thread-13 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-12 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-15 (4 secs)
(remaining: None)
(remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-16 (4 secs)
(remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-14 (4 secs)
(remaining: None)
(remaining: None)
all Done at: Tue Oct 29 22:53:26 2019
存在两个问题:
- 由于多个线程执行进度的不同,可能会出现交替输出的问题(I/O)
- 可能出现多个线程同时改变同一个变量的情况
将
mtsleepF.py
改写为
mtsleepG.py
,增加锁机制,代码如下:
# mtsleepG.py
from atexit import register
from random import randrange
from threading import Thread, current_thread, Lock
from time import sleep, ctime
class CleanOutputSet(set):
def __str__(self):
return ','.join(x for x in self)
lock = Lock()
loops = (randrange(2, 5) for x in range(randrange(3, 20))) # 将列表生成式的[]改为(),就创建了一个generator。 randrange(3, 7)表示创建3~6个进程,randrange(2,5)表示为每一个进程选择一个睡眠时间
remaining = CleanOutputSet()
def loop(nsec):
myname = current_thread().name
lock.acquire()
remaining.add(myname)
print('[%s] started %s' % (ctime(), myname))
lock.release()
sleep(nsec)
lock.acquire()
remaining.remove(myname)
print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'None'))
lock.release()
def main():
for pause in loops:
Thread(target=loop, args=(pause, )).start() # args=(pause, )中的','必不可少,如果没有,那么args就会被当做int
@register
def _atexit():
print('all Done at:', ctime())
if __name__ == '__main__':
main()
代码容易理解,不再过多解释。不过使用with语句可以代替acquire()和release(),使代码更简洁优化,可将loop()函数改为:
from threading import Thread, current_thread, Lock
from time import sleep, ctime
def loop(nsec):
myname = current_thread().name
with lock:
remaining.add(myname)
print('[%s] started %s' % (ctime(), myname))
sleep(nsec)
with lock:
remaining.remove(myname)
print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'None'))
-
信号量示例
pass
python多进程
这里只讲述multiprocessing模块。os模块中的
fork()
函数也能实现多进程,功能太弱。
Process类
multiprocessing模块和threading模块非常相似,其中最主要的类是Process类,和Thread类的API相同:
属性 | 描述 |
---|---|
name | 进程名,仅用于识别目的,没有语义 |
pid | 进程ID。在生成进程前为 |
daemon | 守护进程标记(必须在start()被调用前设置) |
exitcode | 子进程的退出代码。如果子进程尚未终止则为 ;负值 − N -N −N表示子进程被信号 N N N终止 |
authkey | 进程的身份验证密钥(bytes) |
sentinel | 系统对象的数字句柄,当进程结束时将变为"ready" |
方法 | 描述 |
---|---|
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) | 初始化方法,实例化一个进程对象,需要有一个可调用的target,以及其参数args或/和kwargs。还可以传递name和group参数(后者仅用于兼容threading.Thread)。此外。而daemon的值将会设定`process.daemon属性/标志 |
start() | 启动进程活动 |
run() | 定义该进程活动的方法(通常在子类中被重写) |
join(timeout=None) | 直至启动的进程终止之前一直被挂起;除非给出了timeout(秒),否则会一直阻塞(进程无法join自身,否则会死锁 |
is_alive() | 返回布尔值,表示这个进程是否还存活 |
terminate() | 强制终止进程且不做清理。进程的后代进程将不会被终止,变成僵尸进程;如果有锁没有释放,将变成死锁 |
kill() | 与terminate()功能相同 |
close() | 关闭Process对象,释放与之关联的所有资源。如果底层进程仍在运行,会引起ValueError |
对应于Thread类,下面从三个方面说明如何使用Process类:
- 创建Process的实例,传给它一个函数。
- 创建Process的实例,传给它一个可调用的类实例。
-
派生Process的子类,并创建子类的实例。
最推荐最后一种方案。最不推荐第二种。
创建Process的实例,传给它一个函数
将
mtsleepC.py
改写为
mpsleepC.py
,将Thread类换为Process类,代码如下:
# mpsleepC.py
from multiprocessing import Process
from time import sleep, ctime
import os
loops = [4, 2]
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
processes = []
nloops = range(len(loops))
for i in nloops:
t = Process(target=loop, args=(loops[i], ))
processes.append(t)
for i in nloops:
processes[i].start()
for i in nloops:
processes[i].join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
执行结果:
[Wed Oct 30 09:45:57 2019] 15513 main process start ... ...
[Wed Oct 30 09:45:57 2019] 15514 start.
[Wed Oct 30 09:45:57 2019] 15515 start.
[Wed Oct 30 09:45:59 2019] 15515 end.
[Wed Oct 30 09:46:01 2019] 15514 end.
[Wed Oct 30 09:46:01 2019] 15513 main process end ... ...
其中,
os.getpid()
函数用于获取当前进程号。
创建Process的实例,传给它一个可调用的类实例
将
mtsleepD.py
改写为
mpsleepD.py
,代码如下:
# mpsleepD.py
import multiprocessing
import os
from time import sleep, ctime
loops = [4, 2]
class ProcessFunc():
'''可调用类'''
def __init__(self, func, args, name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
self.func(*self.args)
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
processes = []
nloops = range(len(loops))
for i in nloops:
t = multiprocessing.Process(target=ProcessFunc(loop, (loops[i], ), loop.__name__))
processes.append(t)
for i in nloops:
processes[i].start()
for i in nloops:
processes[i].join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
执行结果如下:
[Wed Oct 30 09:50:58 2019] 16023 main process start ... ...
[Wed Oct 30 09:50:58 2019] 16024 start.
[Wed Oct 30 09:50:58 2019] 16026 start.
[Wed Oct 30 09:51:00 2019] 16026 end.
[Wed Oct 30 09:51:02 2019] 16024 end.
[Wed Oct 30 09:51:02 2019] 16023 main process end ... ...
派生Process的子类,并创建子类的实例
将
mtsleepE.py
改写为
mpsleepE.py
,代码如下:
# mpsleepE.py
import multiprocessing
import os
from time import sleep, ctime
loops = [4, 2]
class MyProcess(multiprocessing.Process):
def __init__(self, func, args, name=''):
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
processes = []
nloops = range(len(loops))
for i in nloops:
t = MyProcess(loop, (loops[i], ), loop.__name__)
processes.append(t)
for i in nloops:
processes[i].start()
for i in nloops:
processes[i].join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
执行结果如下:
[Wed Oct 30 09:57:00 2019] 16683 main process start ... ...
[Wed Oct 30 09:57:00 2019] 16684 start.
[Wed Oct 30 09:57:00 2019] 16685 start.
[Wed Oct 30 09:57:02 2019] 16685 end.
[Wed Oct 30 09:57:04 2019] 16684 end.
[Wed Oct 30 09:57:04 2019] 16683 main process end ... ...
孤儿进程和僵尸进程
-
孤儿进程
如果父进程终止,而由父进程创建的一个或多个子进程还在执行的话,这一个或多个子进程就会成为孤儿进程,成为孤儿进程后,善后工作(wait()或者waitpid()等)就会由init接管,init进程是内核启动的第一个进程,pid=1,由0号进程idle创建。应为有人善后,所以孤儿进程是无害的。
-
僵尸进程
由于父进程和子进程是异步的,所以父进程不会知道子进程会在什么时候结束,因此,为了在子进程结束后让父进程知道,子进程结束后会保留一部分系统资源如pid,运行时间,退出状态等。等父进程通过wait()或者waitpid()系统调用取得这些信息时,这部分资源才会被释放。但如果父进程一直未调用wait()或者waitpid(),那这些资源就一直不会被释放,比如pid,pid的数量是有限的,如果僵尸进程过多,就会导致pid不足而无法创建新进程,所以僵尸进程是有害的。
下面是演示僵尸进程的
的代码:zombie_process.py
# zombie_process.py
import multiprocessing
from time import ctime, sleep
import os
def demo():
print('[%s] %s start.' % (ctime(), os.getpid()))
sleep(1)
print('[%s] %s end.' % (ctime(), os.getpid()))
def main():
print('[%s] %s father process start ... ...' % (ctime(), os.getpid()))
multiprocessing.Process(target=demo).start()
sleep(10000)
print('[%s] %s father process end ... ...' % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
父进程sleep 10000s,而子进程sleep 1s,所以子进程远早于父进程结束,子进程结束而父进程还未结束时的输出结果为:
[Wed Oct 30 10:19:23 2019] 18262 father process start ... ...
[Wed Oct 30 10:19:23 2019] 18263 start.
[Wed Oct 30 10:19:24 2019] 18263 end.
这是再开一个终端执行:
ps aux | grep zombie_process.py
这条命令的作用是查看所有与
zombie_process.py
有关的系统进程。输出为:
qyh 18262 0.1 0.0 38332 11124 pts/1 S+ 10:19 0:00 python zombie_process.py
qyh 18321 0.0 0.0 21536 1008 pts/3 S+ 10:19 0:00 grep --color=auto zombie_process.py
可以看到第一个为父进程,占用cpu 0.1%,第二个是子进程,没有占用cpu和内存,就是所谓的僵尸进程。这是在终端执行:
kill 18262
杀死父进程,父进程结束后僵尸进程就会变成孤儿进程,由init收养后释放资源。
Pool类
如果需要创建大量进程,使用Process类管理就会比较麻烦,就可以使用进程池Pool类。Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
将
mpsleepC.py
改写为
mpsleepF.py
用来说明Pool类的用法,代码如下:
# mpsleepF.py
from multiprocessing import Pool
from time import sleep, ctime
import os
loops = [4, 2, 3, 4, 5, 2, 1]
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
p = Pool(processes=3)
rl = p.map(loop, loops)
p.close()
p.join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
第16行
实例化一个进程池对象,传入的参数processes表示进程池中进程的个数,这里为3。
第18~20行
- pool.map()函数原型如下:
和内置的map()函数用法行为基本一致,它会使进程阻塞直至返回结果。
rl表示result_list,是存储返回结果的列表。
2. pool.close()函数用于关闭进程池,使其不再接受新的任务。
3. pool.join()主进程阻塞等待进程池中的子进程退出。必须在close()或terminate()之后使用。
执行结果如下:
[Wed Oct 30 15:12:58 2019] 21744 main process start ... ...
[Wed Oct 30 15:12:58 2019] 21745 start.
[Wed Oct 30 15:12:58 2019] 21746 start.
[Wed Oct 30 15:12:58 2019] 21747 start.
[Wed Oct 30 15:13:00 2019] 21746 end.
[Wed Oct 30 15:13:00 2019] 21746 start.
[Wed Oct 30 15:13:01 2019] 21747 end.
[Wed Oct 30 15:13:01 2019] 21747 start.
[Wed Oct 30 15:13:02 2019] 21745 end.
[Wed Oct 30 15:13:02 2019] 21745 start.
[Wed Oct 30 15:13:04 2019] 21746 end.
[Wed Oct 30 15:13:04 2019] 21746 start.
[Wed Oct 30 15:13:04 2019] 21745 end.
[Wed Oct 30 15:13:05 2019] 21746 end.
[Wed Oct 30 15:13:06 2019] 21747 end.
[Wed Oct 30 15:13:06 2019] 21744 main process end ... ...
Pool类还有两个重要的函数,函数原型如下:
apply(func, args=(), kwds={})
apply_async(func, args=(), kwds={}, callback=None)
apply()使用args参数极易kwds命名参数调用func,它会在返回结果前阻塞主进程。这使得它和单进程没什么区别。python3.x之后很少用。
apply_async()和apply()用法一样,但它是非阻塞的,并且支持结果返回进行回调。
将
mpsleepF.py
改写为
mpsleepG.py
,用来比较apply()和apply_async()的用法,代码如下:
# mpsleepG.py
from multiprocessing import Pool
from time import sleep, ctime
import os
loops = [4, 2, 3, 4, 5, 2, 1]
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main1():
print("[%s] %s main process_1 start ... ..." % (ctime(), os.getpid()))
p = Pool(processes=3)
for i in range(len(loops)):
p.apply(loop, (loops[i], ))
p.close()
p.join()
print("[%s] %s main process_1 end ... ..." % (ctime(), os.getpid()))
def main2():
print("[%s] %s main process_2 start ... ..." % (ctime(), os.getpid()))
p = Pool(processes=3)
result_list = []
for i in range(len(loops)):
result_list.append(p.apply_async(loop, (loops[i], )))
p.close()
p.join()
print("[%s] %s main process_2 end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main1()
main2()
执行结果如下:
[Wed Oct 30 15:26:28 2019] 22799 main process_1 start ... ...
[Wed Oct 30 15:26:28 2019] 22800 start.
[Wed Oct 30 15:26:32 2019] 22800 end.
[Wed Oct 30 15:26:32 2019] 22801 start.
[Wed Oct 30 15:26:34 2019] 22801 end.
[Wed Oct 30 15:26:34 2019] 22802 start.
[Wed Oct 30 15:26:37 2019] 22802 end.
[Wed Oct 30 15:26:37 2019] 22800 start.
[Wed Oct 30 15:26:41 2019] 22800 end.
[Wed Oct 30 15:26:41 2019] 22801 start.
[Wed Oct 30 15:26:46 2019] 22801 end.
[Wed Oct 30 15:26:46 2019] 22802 start.
[Wed Oct 30 15:26:48 2019] 22802 end.
[Wed Oct 30 15:26:48 2019] 22800 start.
[Wed Oct 30 15:26:49 2019] 22800 end.
[Wed Oct 30 15:26:49 2019] 22799 main process_1 end ... ...
[Wed Oct 30 15:26:49 2019] 22799 main process_2 start ... ...
[Wed Oct 30 15:26:49 2019] 22808 start.
[Wed Oct 30 15:26:49 2019] 22809 start.
[Wed Oct 30 15:26:49 2019] 22810 start.
[Wed Oct 30 15:26:51 2019] 22809 end.
[Wed Oct 30 15:26:51 2019] 22809 start.
[Wed Oct 30 15:26:52 2019] 22810 end.
[Wed Oct 30 15:26:52 2019] 22810 start.
[Wed Oct 30 15:26:53 2019] 22808 end.
[Wed Oct 30 15:26:53 2019] 22808 start.
[Wed Oct 30 15:26:55 2019] 22809 end.
[Wed Oct 30 15:26:55 2019] 22809 start.
[Wed Oct 30 15:26:55 2019] 22808 end.
[Wed Oct 30 15:26:56 2019] 22809 end.
[Wed Oct 30 15:26:57 2019] 22810 end.
[Wed Oct 30 15:26:57 2019] 22799 main process_2 end ... ...
main2()中的result_list列表用来存储返回值(当然这个例子中的func并没有返回值)。
multiprocessing模块中锁的用法和threading模块完全相同,不再重复讲解。
进程间通信
pass