天天看点

Python多线程和多进程Python多线程python多进程

本文是笔者学习python多线程和多进程的笔记。

Python多线程

单线程执行sleep

onethr.py

文件中创建两个时间循环:一个睡眠4s(loop0);一个睡眠2s(loop1)(这里使用"loop0"和"loop1"作为函数名)。以下多线程的讲解示例将以此为基础。代码如下:

# onethy.py

from time import sleep, ctime

def loop0():
    print("Start loop 0 at:", ctime())
    sleep(4)
    print("Loop 0 done at:", ctime())

def loop1():
    print("Start loop 1 at:", ctime())
    sleep(2)
    print("Loop 1 done at:", ctime())

def main():
    print("Start at:", ctime())
    loop0()
    loop1()
    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

执行结果如下:

Start at: Tue Oct 29 09:39:25 2019
Start loop 0 at: Tue Oct 29 09:39:25 2019
Loop 0 done at: Tue Oct 29 09:39:29 2019
Start loop 1 at: Tue Oct 29 09:39:29 2019
Loop 1 done at: Tue Oct 29 09:39:31 2019
All done at: Tue Oct 29 09:39:31 2019
           

多线程编程

Python支持多线程编程的模块:

thread模块:提供了基本的线程和锁定支持;

threading模块:提供了更高级别、功能更全面的线程管理;

Queue模块:用于创建一个队列数据结构,在多线程之间进行共享。

建议:推荐使用更先进的threading模块,而不是thread模块。

注意:在python3中,thread模块被重命名为_thread模块。

thread模块

下表是一些thread模块常用的线程函数和LockType锁对象的方法。

thread模块的函数 功能
start_new_thread(function, args, kwargs=None) 派生一个新的线程,使用给定的args和可选的kwargs来执行funciton
allocate_lock() 分配LockType所对象
exit() 给线程退出指令
LockType锁对象的方法 功能
acquire(wait=None) 尝试获取锁对象
locked() 如果获取了锁对象则返回True,否则,返回False
release() 释放锁
  1. 使用thread模块的多线程机制将

    onethr.py

    改写为

    mtsleepA.py

    ,代码如下:
# mtsleepA.py

import _thread
from time import sleep, ctime

def loop0():
    print("Start loop 0 at:", ctime())
    sleep(4)
    print("Loop 0 done at:", ctime())

def loop1():
    print("Start loop 1 at:", ctime())
    sleep(2)
    print("Loop 1 done at:", ctime())

def main():
    print("Start at:", ctime())
    _thread.start_new_thread(loop0, ())
    _thread.start_new_thread(loop1, ()) #  第二个参数args是一个tuple,没有默认值,所以即使function没有参数,也要传入一个空tuple
    sleep(6)
    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

执行结果如下:

Start at: Tue Oct 29 10:16:05 2019
Start loop 0 at: Tue Oct 29 10:16:05 2019
Start loop 1 at: Tue Oct 29 10:16:05 2019
Loop 1 done at: Tue Oct 29 10:16:07 2019
Loop 0 done at: Tue Oct 29 10:16:09 2019
All done at: Tue Oct 29 10:16:11 2019
           

可以看出loop0和loop1几乎同时开始执行,且两者是并行执行的,整体的运行时间是4s。

代码中第20行的

sleep(6)

的调用是非常有用的。如果没有20行,执行了18和19行后,直接显示"All done",然后主线程结束退出,那么子线程loop0()和loop1()也会直接终止。

这里我们知道这个程序最多执行6s,但是实际上只需要4s,并没有比单线程节省时间,而且大部分情况我们是不知道需要执行多久,那么就没办法使用sleep()来进行线程的同步机制了。这是就需要锁了。

2. 使用thread模块的锁对象,将

mtsleepA.py

改写为

mtsleepB.py

,代码如下:

# mtsleepB.py

import _thread
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec, lock):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())
    lock.release()

def main():
    print("Start at:", ctime())
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = _thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        _thread.start_new_thread(loop, (i, loops[i], locks[i]))


    for i in nloops:
        while locks[i].locked():
            pass
    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

下面逐行对代码进行解释:

第1~6行

  1. 导入模块
  2. 不再对4s和2s硬编码到不同函数中,而是使用了唯一的loop函数,并把这些常量放入列表loops列表中。

第8~12行

  1. loop()函数代替了之前的loop*()函数。
  2. 给loop()分配一个已获得的锁,当执行完后释放锁,向主线程表明该子线程已完成。

第14~34行

3. 第一个for循环创建锁的列表,开启多少个子线程就分配多少个锁对象(20行),然后通过

acquire()

方法取得(21行)(取得锁可以理解为“把锁锁上”),并把锁放入列表。

4. 第二个for循环用于派生线程,每个线程都调用loop()函数,并传递号码、睡眠时间以及用于该线程的锁。

5. 第三个for循环用于检查锁列表,看列表里面的锁是否全部释放。只有在释放后才会继续向后执行。

执行结果如下:

Start at: Tue Oct 29 14:32:03 2019
Start loop 0 at: Tue Oct 29 14:32:03 2019
Start loop 1 at: Tue Oct 29 14:32:03 2019
Loop 1 done at: Tue Oct 29 14:32:05 2019
Loop 0 done at: Tue Oct 29 14:32:07 2019
All done at: Tue Oct 29 14:32:07 2019
           

threading模块

下表是threading模块中的所有可用对象:

对象 描述
Thread 表示一个执行线程的对象
Lock 锁原语对象(和thread模块中的锁一样)
RLock 可重入锁对象,使得单一线程可以(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定的‘条件’,比如改变状态或某个数据值
Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活
Semaphore 为线程间共享的有限资源提供了一个“计数器”,如果没有可用资源时会被阻塞
BoundedSemaphore 与Semaphore相似,不过它不允许超过初始值
Timer 与Thread相似,不过他要在运行前等待一段时间
Barrier 创建一个“障碍”,必须达到指定数量的线程后才可以继续

守护线程:守护线程一般是等待客户端请求服务的服务器,无请求时什么都不做,所以默认守护线程是不重要的,在关闭主线程时默认直接关闭守护线程。所以如果主线程在准备退出时,不需要等待一些子线程完成,就可以为这些子线程设置守护线程标记,该标记为真,表示该线程不重要。设置方法是:

thread.daemon=True

(调用

thread.setDamon(True)

的旧方法已经弃用)。检查一个线程是否的守护状态也是检查这个属性(调用

thread.isDaemon()

的旧方法已经弃用)。

整个python程序(可以解读为:主线程)将在所有非守护线程退出后才退出。

  1. Thread类

    Thread类是threading模块的主要执行对象。她有thread模块中没有的很多函数。

    下表给出了Thread对象的属性和方法:

属性 描述
name 线程名
ident 线程的标识符
daemon 守护线程标记
方法 描述
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 初始化方法,实例化一个线程对象,需要有一个可调用的target,以及其参数args或/和kwargs。还可以传递name和group参数(后者还未实现)。此外。而daemon的值将会设定`thread.daemon属性/标志
start() 开始执行该线程
run() 定义该线程功能的方法(通常在子类中被重写)
join(timeout=None) 直至启动的线程终止之前一直被挂起;除非给出了timeout(秒),否则会一直阻塞
getName() 返回线程名(已弃用,可直接调用thread.name属性)
setName(name) 设定线程名(已弃用,可直接修改thread.name属性
is_alive() 返回布尔值,表示这个进程是否还存活
isDaemon() 如果是守护进程,返回True;否则,返回False(已弃用)
setDaemon(daemonic) 把线程的守护标志设置为布尔值daemonic(必须在线程start()之前调用)(已弃用)

下面从三个方面说明如何使用Thread类:

  • 创建Thread的实例,传给它一个函数。
  • 创建Thread的实例,传给它一个可调用的类实例。
  • 派生Thread的子类,并创建子类的实例。

    最推荐最后一种方案。最不推荐第二种。

创建Thread的实例,传给它一个函数

mtsleepB.py

改写为

mtsleepC.py

,添加使用Thread类,代码如下:

# mtsleepC.py

import threading
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())

def main():
    print("Start at:", ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)


    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

执行结果如下:

Start at: Tue Oct 29 15:50:11 2019
Start loop 0 at: Tue Oct 29 15:50:11 2019
Start loop 1 at: Tue Oct 29 15:50:11 2019
Loop 1 done at: Tue Oct 29 15:50:13 2019
Loop 0 done at: Tue Oct 29 15:50:15 2019
All done at: Tue Oct 29 15:50:15 2019
           
  • 使用thread模块时实现的锁没有了,取而代之的是一组Thread对象。
  • 实例化Thread对象(调用

    Thread()

    )和调用

    thread.start_new_thread()

    的最大区别是新县城不会立即开始执行,便于线程的同步。当所有线程分配完成后,再调用每个线程的

    start()

    方法让线程开始执行。
  • 相对于管理一组锁(分配、获取、释放、检查锁状态等),这里只需要为每个线程调用

    join()

    方法即可。

    join()

    方法将等待线程结束,或者在提供了超时时间的情况下,大道超时时间。
  • 实际上

    join()

    方法根本不需要调用,一旦线程启动,它们就会一直执行,直到给定的函数完成后后退出。如果主线程还有其他工作要做,就可以不调用

    join()

    join()

    方法只有在你需要等待线程完成的时候才有用。如果把第26~27行注释掉,执行结果如下:
Start at: Tue Oct 29 15:52:42 2019
Start loop 0 at: Tue Oct 29 15:52:42 2019
Start loop 1 at: Tue Oct 29 15:52:42 2019
All done at: Tue Oct 29 15:52:42 2019
Loop 1 done at: Tue Oct 29 15:52:44 2019
Loop 0 done at: Tue Oct 29 15:52:46 2019
           

从结果可看出

join()

的用法。

创建Thread的实例,传给它一个可调用的类实例

mtsleepC.py

的基础上新增加一个新类ThreadFunc,并进行一些其他的轻微改动,得到

mtsleepD.py

,代码如下:

# mtsleepD.py

import threading
from time import sleep, ctime

loops = [4, 2]

class ThreadFunc():
    '''可调用类'''
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)

def loop(nloop, nsec):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())

def main():
    print("Start at:", ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)


    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

执行结果如下:

Start at: Tue Oct 29 16:11:40 2019
Start loop 0 at: Tue Oct 29 16:11:40 2019
Start loop 1 at: Tue Oct 29 16:11:40 2019
Loop 1 done at: Tue Oct 29 16:11:42 2019
Loop 0 done at: Tue Oct 29 16:11:44 2019
All done at: Tue Oct 29 16:11:44 2019
           

派生Thread的子类,并创建子类的实例

相对于创建可调用类的例子,当创建线程时使用子类要相对更容易阅读。下面是

mtsleepE.py

的代码:

# mtsleepE.py

import threading
from time import sleep, ctime

loops = [4, 2]

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        super().__init__()
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

def loop(nloop, nsec):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())

def main():
    print("Start at:", ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

执行结果:

Start at: Tue Oct 29 16:21:04 2019
Start loop 0 at: Tue Oct 29 16:21:04 2019
Start loop 1 at: Tue Oct 29 16:21:04 2019
Loop 1 done at: Tue Oct 29 16:21:06 2019
Loop 0 done at: Tue Oct 29 16:21:08 2019
All done at: Tue Oct 29 16:21:08 2019
           

如果执行对象有返回值,可在MyThread类中增加

get_result()

方法来取得返回值,这是MyThread类的代码如下:

import threading

class MyThread(threading.Thread):
	def __init__(self, func, args, name='')
		super().__init__()
		self.name = name
		self.func = func
		self.args = args

	def run(self):
		self.result = self.func(*self.args)

	def get_result(self):
		try:
			return self.result
		except Exception as e:
			return None
           
  1. threading模块的其他函数

    除了各种同步和线程对象外,threading模块还提供了一些函数,如下表:

函数 描述
active_count() 当前活动的Thread对象个数
current_count() 返回当前的Thread对象
enumerate() 返回当前活动的Thread对象列表
settrace(func) 为所有线程设置一个trace函数
setprofile(func) 为所有线程设置一个profile函数
stack_size(size=0) 返回新创建线程的栈大小;或为后续创建的线程设定栈的大小为size

同步原语

一般在多线程代码中,总会有一些特定的函数或代码块不希望(或不应该)被多个线程同时执行,通常包括修改数据库、更新文件或其他会产生竞态条件的类似情况。这就是需要使用同步的情况。这里介绍两种类型的同步原语:锁/互斥;信号量。

  1. 锁示例

    锁有两种状态:锁定和未锁定。

    只支持两种函数:获得锁和释放锁。

    下面是

    mtsleepF.py

    没有使用锁的代码:
# mtsleepF.py

from atexit import register
from random import randrange
from threading import Thread, currentThread
from time import sleep, ctime

class CleanOutputSet(set):
    def __str__(self):
        return ','.join(x for x in self)

loops = (randrange(2, 5) for x in range(randrange(3, 20))) #  将列表生成式的[]改为(),就创建了一个generator。   randrange(3, 7)表示创建3~6个进程,randrange(2,5)表示为每一个进程选择一个睡眠时间
remaining = CleanOutputSet()

def loop(nsec):
    myname = currentThread().name
    remaining.add(myname)
    print('[%s] started %s' % (ctime(), myname))
    sleep(nsec)
    remaining.remove(myname)
    print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
    print('  (remaining: %s)' % (remaining or 'None'))

def main():
    for pause in loops:
        Thread(target=loop, args=(pause, )).start() #  args=(pause, )中的','必不可少,如果没有,那么args就会被当做int

@register
def _atexit():
    print('all Done at:', ctime())

if __name__ == '__main__':
    main()
           

我们来解释上述代码:

第1~6行

  1. atexit模块及其

    register()

    函数用法参考https://blog.csdn.net/qyhaill/article/details/102807732
  2. random模块的

    randrange()

    函数用法参考https://www.runoob.com/python/func-number-randrange.html
  3. threading.currentThread()返回当前线程变量,这是旧的写法,新的写法是threading.current_thread()

第8~10行

  1. 派生python的set类(set类介绍百度很多),实现了它的

    __str__()

    方法
  2. join()

    函数用法参考https://www.runoob.com/python/att-string-join.html

第12~22行

  1. loops是一个generator。将列表生成式的[]改为(),就创建了一个generator
  2. remaining是set类的派生类的对象,用来记录剩下的还在运行的线程
  3. loop()

    函数进行一些修改,在sleep完之后将remaining中剩下的还在执行的线程显示出来

第24~30行

  1. main()函数创建并开始执行线程
  2. 28~30行的意思参见atexit模块的用法

幸运的话,执行结果会按适当的顺序给出:

[Tue Oct 29 22:53:28 2019] started Thread-1
[Tue Oct 29 22:53:28 2019] started Thread-2
[Tue Oct 29 22:53:28 2019] started Thread-3
[Tue Oct 29 22:53:28 2019] started Thread-4
[Tue Oct 29 22:53:28 2019] started Thread-5
[Tue Oct 29 22:53:28 2019] started Thread-6
[Tue Oct 29 22:53:28 2019] started Thread-7
[Tue Oct 29 22:53:30 2019] completed Thread-2 (2 secs)
  (remaining: Thread-7,Thread-6,Thread-4,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-4 (2 secs)
  (remaining: Thread-7,Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-7 (2 secs)
  (remaining: Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:31 2019] completed Thread-3 (3 secs)
  (remaining: Thread-6,Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-6 (4 secs)
  (remaining: Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-5 (4 secs)
  (remaining: Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-1 (4 secs)
  (remaining: None)
all Done at: Tue Oct 29 22:53:32 2019
           

不幸运的话,会得到下面这样奇怪的输出:

[Tue Oct 29 22:53:22 2019] started Thread-1
[Tue Oct 29 22:53:22 2019] started Thread-2
[Tue Oct 29 22:53:22 2019] started Thread-3
[Tue Oct 29 22:53:22 2019] started Thread-4
[Tue Oct 29 22:53:22 2019] started Thread-5
[Tue Oct 29 22:53:22 2019] started Thread-6
[Tue Oct 29 22:53:22 2019] started Thread-7
[Tue Oct 29 22:53:22 2019] started Thread-8
[Tue Oct 29 22:53:22 2019] started Thread-9
[Tue Oct 29 22:53:22 2019] started Thread-10
[Tue Oct 29 22:53:22 2019] started Thread-11
[Tue Oct 29 22:53:22 2019] started Thread-12
[Tue Oct 29 22:53:22 2019] started Thread-13
[Tue Oct 29 22:53:22 2019] started Thread-14
[Tue Oct 29 22:53:22 2019] started Thread-15
[Tue Oct 29 22:53:22 2019] started Thread-16
[Tue Oct 29 22:53:24 2019] completed Thread-3 (2 secs)
  (remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7,Thread-2)
[Tue Oct 29 22:53:24 2019] completed Thread-2 (2 secs)
  (remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-8 (2 secs)
  (remaining: Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-10 (2 secs)
  (remaining: Thread-6,Thread-9,Thread-14,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:25 2019] completed Thread-11 (3 secs)
  (remaining: Thread-6,Thread-9,Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:26 2019] completed Thread-6 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-7 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-4 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-9 (4 secs)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
[Tue Oct 29 22:53:26 2019] completed Thread-1 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-5 (4 secs)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
[Tue Oct 29 22:53:26 2019] completed Thread-13 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-12 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-15 (4 secs)
  (remaining: None)
  (remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-16 (4 secs)
  (remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-14 (4 secs)
  (remaining: None)
  (remaining: None)
all Done at: Tue Oct 29 22:53:26 2019
           

存在两个问题:

  1. 由于多个线程执行进度的不同,可能会出现交替输出的问题(I/O)
  2. 可能出现多个线程同时改变同一个变量的情况

mtsleepF.py

改写为

mtsleepG.py

,增加锁机制,代码如下:

# mtsleepG.py

from atexit import register
from random import randrange
from threading import Thread, current_thread, Lock
from time import sleep, ctime

class CleanOutputSet(set):
    def __str__(self):
        return ','.join(x for x in self)

lock = Lock()
loops = (randrange(2, 5) for x in range(randrange(3, 20))) #  将列表生成式的[]改为(),就创建了一个generator。   randrange(3, 7)表示创建3~6个进程,randrange(2,5)表示为每一个进程选择一个睡眠时间
remaining = CleanOutputSet()

def loop(nsec):
    myname = current_thread().name
    lock.acquire()
    remaining.add(myname)
    print('[%s] started %s' % (ctime(), myname))
    lock.release()
    sleep(nsec)
    lock.acquire()
    remaining.remove(myname)
    print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
    print('  (remaining: %s)' % (remaining or 'None'))
    lock.release()

def main():
    for pause in loops:
        Thread(target=loop, args=(pause, )).start() #  args=(pause, )中的','必不可少,如果没有,那么args就会被当做int

@register
def _atexit():
    print('all Done at:', ctime())

if __name__ == '__main__':
    main()
           

代码容易理解,不再过多解释。不过使用with语句可以代替acquire()和release(),使代码更简洁优化,可将loop()函数改为:

from threading import Thread, current_thread, Lock
from time import sleep, ctime

def loop(nsec):
    myname = current_thread().name
    with lock:
        remaining.add(myname)
        print('[%s] started %s' % (ctime(), myname))
    sleep(nsec)
    with lock:
        remaining.remove(myname)
        print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
        print('  (remaining: %s)' % (remaining or 'None'))
           
  1. 信号量示例

    pass

python多进程

这里只讲述multiprocessing模块。os模块中的

fork()

函数也能实现多进程,功能太弱。

Process类

multiprocessing模块和threading模块非常相似,其中最主要的类是Process类,和Thread类的API相同:

属性 描述
name 进程名,仅用于识别目的,没有语义
pid 进程ID。在生成进程前为

None

daemon 守护进程标记(必须在start()被调用前设置)
exitcode 子进程的退出代码。如果子进程尚未终止则为

None

;负值 − N -N −N表示子进程被信号 N N N终止
authkey 进程的身份验证密钥(bytes)
sentinel 系统对象的数字句柄,当进程结束时将变为"ready"
方法 描述
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 初始化方法,实例化一个进程对象,需要有一个可调用的target,以及其参数args或/和kwargs。还可以传递name和group参数(后者仅用于兼容threading.Thread)。此外。而daemon的值将会设定`process.daemon属性/标志
start() 启动进程活动
run() 定义该进程活动的方法(通常在子类中被重写)
join(timeout=None) 直至启动的进程终止之前一直被挂起;除非给出了timeout(秒),否则会一直阻塞(进程无法join自身,否则会死锁
is_alive() 返回布尔值,表示这个进程是否还存活
terminate() 强制终止进程且不做清理。进程的后代进程将不会被终止,变成僵尸进程;如果有锁没有释放,将变成死锁
kill() 与terminate()功能相同
close() 关闭Process对象,释放与之关联的所有资源。如果底层进程仍在运行,会引起ValueError

对应于Thread类,下面从三个方面说明如何使用Process类:

  • 创建Process的实例,传给它一个函数。
  • 创建Process的实例,传给它一个可调用的类实例。
  • 派生Process的子类,并创建子类的实例。

    最推荐最后一种方案。最不推荐第二种。

创建Process的实例,传给它一个函数

mtsleepC.py

改写为

mpsleepC.py

,将Thread类换为Process类,代码如下:

# mpsleepC.py

from multiprocessing import Process
from time import sleep, ctime
import os

loops = [4, 2]

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    processes = []
    nloops = range(len(loops))

    for i in nloops:
        t = Process(target=loop, args=(loops[i], ))
        processes.append(t)


    for i in nloops:
        processes[i].start()

    for i in nloops:
        processes[i].join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

执行结果:

[Wed Oct 30 09:45:57 2019] 15513 main process start ... ...
[Wed Oct 30 09:45:57 2019] 15514 start.
[Wed Oct 30 09:45:57 2019] 15515 start.
[Wed Oct 30 09:45:59 2019] 15515 end.
[Wed Oct 30 09:46:01 2019] 15514 end.
[Wed Oct 30 09:46:01 2019] 15513 main process end ... ...
           

其中,

os.getpid()

函数用于获取当前进程号。

创建Process的实例,传给它一个可调用的类实例

mtsleepD.py

改写为

mpsleepD.py

,代码如下:

# mpsleepD.py

import multiprocessing
import os
from time import sleep, ctime

loops = [4, 2]

class ProcessFunc():
    '''可调用类'''
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    processes = []
    nloops = range(len(loops))

    for i in nloops:
        t = multiprocessing.Process(target=ProcessFunc(loop, (loops[i], ), loop.__name__))
        processes.append(t)


    for i in nloops:
        processes[i].start()

    for i in nloops:
        processes[i].join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

执行结果如下:

[Wed Oct 30 09:50:58 2019] 16023 main process start ... ...
[Wed Oct 30 09:50:58 2019] 16024 start.
[Wed Oct 30 09:50:58 2019] 16026 start.
[Wed Oct 30 09:51:00 2019] 16026 end.
[Wed Oct 30 09:51:02 2019] 16024 end.
[Wed Oct 30 09:51:02 2019] 16023 main process end ... ...
           

派生Process的子类,并创建子类的实例

mtsleepE.py

改写为

mpsleepE.py

,代码如下:

# mpsleepE.py

import multiprocessing
import os
from time import sleep, ctime

loops = [4, 2]

class MyProcess(multiprocessing.Process):
    def __init__(self, func, args, name=''):
        super().__init__()
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    processes = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyProcess(loop, (loops[i], ), loop.__name__)
        processes.append(t)

    for i in nloops:
        processes[i].start()

    for i in nloops:
        processes[i].join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

执行结果如下:

[Wed Oct 30 09:57:00 2019] 16683 main process start ... ...
[Wed Oct 30 09:57:00 2019] 16684 start.
[Wed Oct 30 09:57:00 2019] 16685 start.
[Wed Oct 30 09:57:02 2019] 16685 end.
[Wed Oct 30 09:57:04 2019] 16684 end.
[Wed Oct 30 09:57:04 2019] 16683 main process end ... ...
           

孤儿进程和僵尸进程

  1. 孤儿进程

    如果父进程终止,而由父进程创建的一个或多个子进程还在执行的话,这一个或多个子进程就会成为孤儿进程,成为孤儿进程后,善后工作(wait()或者waitpid()等)就会由init接管,init进程是内核启动的第一个进程,pid=1,由0号进程idle创建。应为有人善后,所以孤儿进程是无害的。

  2. 僵尸进程

    由于父进程和子进程是异步的,所以父进程不会知道子进程会在什么时候结束,因此,为了在子进程结束后让父进程知道,子进程结束后会保留一部分系统资源如pid,运行时间,退出状态等。等父进程通过wait()或者waitpid()系统调用取得这些信息时,这部分资源才会被释放。但如果父进程一直未调用wait()或者waitpid(),那这些资源就一直不会被释放,比如pid,pid的数量是有限的,如果僵尸进程过多,就会导致pid不足而无法创建新进程,所以僵尸进程是有害的。

    下面是演示僵尸进程的

    zombie_process.py

    的代码:
# zombie_process.py

import multiprocessing
from time import ctime, sleep
import os

def demo():
    print('[%s] %s start.' % (ctime(), os.getpid()))
    sleep(1)
    print('[%s] %s end.' % (ctime(), os.getpid()))

def main():
    print('[%s] %s father process start ... ...' % (ctime(), os.getpid()))
    multiprocessing.Process(target=demo).start()
    sleep(10000)
    print('[%s] %s father process end ... ...' % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

父进程sleep 10000s,而子进程sleep 1s,所以子进程远早于父进程结束,子进程结束而父进程还未结束时的输出结果为:

[Wed Oct 30 10:19:23 2019] 18262 father process start ... ...
[Wed Oct 30 10:19:23 2019] 18263 start.
[Wed Oct 30 10:19:24 2019] 18263 end.
           

这是再开一个终端执行:

ps aux | grep zombie_process.py
           

这条命令的作用是查看所有与

zombie_process.py

有关的系统进程。输出为:

qyh      18262  0.1  0.0  38332 11124 pts/1    S+   10:19   0:00 python zombie_process.py
qyh      18321  0.0  0.0  21536  1008 pts/3    S+   10:19   0:00 grep --color=auto zombie_process.py
           

可以看到第一个为父进程,占用cpu 0.1%,第二个是子进程,没有占用cpu和内存,就是所谓的僵尸进程。这是在终端执行:

kill 18262
           

杀死父进程,父进程结束后僵尸进程就会变成孤儿进程,由init收养后释放资源。

Pool类

如果需要创建大量进程,使用Process类管理就会比较麻烦,就可以使用进程池Pool类。Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

mpsleepC.py

改写为

mpsleepF.py

用来说明Pool类的用法,代码如下:

# mpsleepF.py

from multiprocessing import Pool
from time import sleep, ctime
import os

loops = [4, 2, 3, 4, 5, 2, 1]

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    p = Pool(processes=3)

    rl = p.map(loop, loops)
    p.close()
    p.join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

第16行

实例化一个进程池对象,传入的参数processes表示进程池中进程的个数,这里为3。

第18~20行

  1. pool.map()函数原型如下:

和内置的map()函数用法行为基本一致,它会使进程阻塞直至返回结果。

rl表示result_list,是存储返回结果的列表。

2. pool.close()函数用于关闭进程池,使其不再接受新的任务。

3. pool.join()主进程阻塞等待进程池中的子进程退出。必须在close()或terminate()之后使用。

执行结果如下:

[Wed Oct 30 15:12:58 2019] 21744 main process start ... ...
[Wed Oct 30 15:12:58 2019] 21745 start.
[Wed Oct 30 15:12:58 2019] 21746 start.
[Wed Oct 30 15:12:58 2019] 21747 start.
[Wed Oct 30 15:13:00 2019] 21746 end.
[Wed Oct 30 15:13:00 2019] 21746 start.
[Wed Oct 30 15:13:01 2019] 21747 end.
[Wed Oct 30 15:13:01 2019] 21747 start.
[Wed Oct 30 15:13:02 2019] 21745 end.
[Wed Oct 30 15:13:02 2019] 21745 start.
[Wed Oct 30 15:13:04 2019] 21746 end.
[Wed Oct 30 15:13:04 2019] 21746 start.
[Wed Oct 30 15:13:04 2019] 21745 end.
[Wed Oct 30 15:13:05 2019] 21746 end.
[Wed Oct 30 15:13:06 2019] 21747 end.
[Wed Oct 30 15:13:06 2019] 21744 main process end ... ...
           

Pool类还有两个重要的函数,函数原型如下:

apply(func, args=(), kwds={})
apply_async(func, args=(), kwds={}, callback=None)
           

apply()使用args参数极易kwds命名参数调用func,它会在返回结果前阻塞主进程。这使得它和单进程没什么区别。python3.x之后很少用。

apply_async()和apply()用法一样,但它是非阻塞的,并且支持结果返回进行回调。

mpsleepF.py

改写为

mpsleepG.py

,用来比较apply()和apply_async()的用法,代码如下:

# mpsleepG.py

from multiprocessing import Pool
from time import sleep, ctime
import os

loops = [4, 2, 3, 4, 5, 2, 1]

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main1():
    print("[%s] %s main process_1 start ... ..." % (ctime(), os.getpid()))
    p = Pool(processes=3)

    for i in range(len(loops)):
        p.apply(loop, (loops[i], ))

    p.close()
    p.join()

    print("[%s] %s main process_1 end ... ..." % (ctime(), os.getpid()))

def main2():
    print("[%s] %s main process_2 start ... ..." % (ctime(), os.getpid()))
    p = Pool(processes=3)
    result_list = []

    for i in range(len(loops)):
        result_list.append(p.apply_async(loop, (loops[i], )))

    p.close()
    p.join()

    print("[%s] %s main process_2 end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main1()
    main2()
           

执行结果如下:

[Wed Oct 30 15:26:28 2019] 22799 main process_1 start ... ...
[Wed Oct 30 15:26:28 2019] 22800 start.
[Wed Oct 30 15:26:32 2019] 22800 end.
[Wed Oct 30 15:26:32 2019] 22801 start.
[Wed Oct 30 15:26:34 2019] 22801 end.
[Wed Oct 30 15:26:34 2019] 22802 start.
[Wed Oct 30 15:26:37 2019] 22802 end.
[Wed Oct 30 15:26:37 2019] 22800 start.
[Wed Oct 30 15:26:41 2019] 22800 end.
[Wed Oct 30 15:26:41 2019] 22801 start.
[Wed Oct 30 15:26:46 2019] 22801 end.
[Wed Oct 30 15:26:46 2019] 22802 start.
[Wed Oct 30 15:26:48 2019] 22802 end.
[Wed Oct 30 15:26:48 2019] 22800 start.
[Wed Oct 30 15:26:49 2019] 22800 end.
[Wed Oct 30 15:26:49 2019] 22799 main process_1 end ... ...
[Wed Oct 30 15:26:49 2019] 22799 main process_2 start ... ...
[Wed Oct 30 15:26:49 2019] 22808 start.
[Wed Oct 30 15:26:49 2019] 22809 start.
[Wed Oct 30 15:26:49 2019] 22810 start.
[Wed Oct 30 15:26:51 2019] 22809 end.
[Wed Oct 30 15:26:51 2019] 22809 start.
[Wed Oct 30 15:26:52 2019] 22810 end.
[Wed Oct 30 15:26:52 2019] 22810 start.
[Wed Oct 30 15:26:53 2019] 22808 end.
[Wed Oct 30 15:26:53 2019] 22808 start.
[Wed Oct 30 15:26:55 2019] 22809 end.
[Wed Oct 30 15:26:55 2019] 22809 start.
[Wed Oct 30 15:26:55 2019] 22808 end.
[Wed Oct 30 15:26:56 2019] 22809 end.
[Wed Oct 30 15:26:57 2019] 22810 end.
[Wed Oct 30 15:26:57 2019] 22799 main process_2 end ... ...
           

main2()中的result_list列表用来存储返回值(当然这个例子中的func并没有返回值)。

multiprocessing模块中锁的用法和threading模块完全相同,不再重复讲解。

进程间通信

pass