天天看點

Python多線程和多程序Python多線程python多程序

本文是筆者學習python多線程和多程序的筆記。

Python多線程

單線程執行sleep

onethr.py

檔案中建立兩個時間循環:一個睡眠4s(loop0);一個睡眠2s(loop1)(這裡使用"loop0"和"loop1"作為函數名)。以下多線程的講解示例将以此為基礎。代碼如下:

# onethy.py

from time import sleep, ctime

def loop0():
    print("Start loop 0 at:", ctime())
    sleep(4)
    print("Loop 0 done at:", ctime())

def loop1():
    print("Start loop 1 at:", ctime())
    sleep(2)
    print("Loop 1 done at:", ctime())

def main():
    print("Start at:", ctime())
    loop0()
    loop1()
    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

執行結果如下:

Start at: Tue Oct 29 09:39:25 2019
Start loop 0 at: Tue Oct 29 09:39:25 2019
Loop 0 done at: Tue Oct 29 09:39:29 2019
Start loop 1 at: Tue Oct 29 09:39:29 2019
Loop 1 done at: Tue Oct 29 09:39:31 2019
All done at: Tue Oct 29 09:39:31 2019
           

多線程程式設計

Python支援多線程程式設計的子產品:

thread子產品:提供了基本的線程和鎖定支援;

threading子產品:提供了更進階别、功能更全面的線程管理;

Queue子產品:用于建立一個隊列資料結構,在多線程之間進行共享。

建議:推薦使用更先進的threading子產品,而不是thread子產品。

注意:在python3中,thread子產品被重命名為_thread子產品。

thread子產品

下表是一些thread子產品常用的線程函數和LockType鎖對象的方法。

thread子產品的函數 功能
start_new_thread(function, args, kwargs=None) 派生一個新的線程,使用給定的args和可選的kwargs來執行funciton
allocate_lock() 配置設定LockType所對象
exit() 給線程退出指令
LockType鎖對象的方法 功能
acquire(wait=None) 嘗試擷取鎖對象
locked() 如果擷取了鎖對象則傳回True,否則,傳回False
release() 釋放鎖
  1. 使用thread子產品的多線程機制将

    onethr.py

    改寫為

    mtsleepA.py

    ,代碼如下:
# mtsleepA.py

import _thread
from time import sleep, ctime

def loop0():
    print("Start loop 0 at:", ctime())
    sleep(4)
    print("Loop 0 done at:", ctime())

def loop1():
    print("Start loop 1 at:", ctime())
    sleep(2)
    print("Loop 1 done at:", ctime())

def main():
    print("Start at:", ctime())
    _thread.start_new_thread(loop0, ())
    _thread.start_new_thread(loop1, ()) #  第二個參數args是一個tuple,沒有預設值,是以即使function沒有參數,也要傳入一個空tuple
    sleep(6)
    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

執行結果如下:

Start at: Tue Oct 29 10:16:05 2019
Start loop 0 at: Tue Oct 29 10:16:05 2019
Start loop 1 at: Tue Oct 29 10:16:05 2019
Loop 1 done at: Tue Oct 29 10:16:07 2019
Loop 0 done at: Tue Oct 29 10:16:09 2019
All done at: Tue Oct 29 10:16:11 2019
           

可以看出loop0和loop1幾乎同時開始執行,且兩者是并行執行的,整體的運作時間是4s。

代碼中第20行的

sleep(6)

的調用是非常有用的。如果沒有20行,執行了18和19行後,直接顯示"All done",然後主線程結束退出,那麼子線程loop0()和loop1()也會直接終止。

這裡我們知道這個程式最多執行6s,但是實際上隻需要4s,并沒有比單線程節省時間,而且大部分情況我們是不知道需要執行多久,那麼就沒辦法使用sleep()來進行線程的同步機制了。這是就需要鎖了。

2. 使用thread子產品的鎖對象,将

mtsleepA.py

改寫為

mtsleepB.py

,代碼如下:

# mtsleepB.py

import _thread
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec, lock):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())
    lock.release()

def main():
    print("Start at:", ctime())
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = _thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        _thread.start_new_thread(loop, (i, loops[i], locks[i]))


    for i in nloops:
        while locks[i].locked():
            pass
    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

下面逐行對代碼進行解釋:

第1~6行

  1. 導入子產品
  2. 不再對4s和2s寫死到不同函數中,而是使用了唯一的loop函數,并把這些常量放入清單loops清單中。

第8~12行

  1. loop()函數代替了之前的loop*()函數。
  2. 給loop()配置設定一個已獲得的鎖,當執行完後釋放鎖,向主線程表明該子線程已完成。

第14~34行

3. 第一個for循環建立鎖的清單,開啟多少個子線程就配置設定多少個鎖對象(20行),然後通過

acquire()

方法取得(21行)(取得鎖可以了解為“把鎖鎖上”),并把鎖放入清單。

4. 第二個for循環用于派生線程,每個線程都調用loop()函數,并傳遞号碼、睡眠時間以及用于該線程的鎖。

5. 第三個for循環用于檢查鎖清單,看清單裡面的鎖是否全部釋放。隻有在釋放後才會繼續向後執行。

執行結果如下:

Start at: Tue Oct 29 14:32:03 2019
Start loop 0 at: Tue Oct 29 14:32:03 2019
Start loop 1 at: Tue Oct 29 14:32:03 2019
Loop 1 done at: Tue Oct 29 14:32:05 2019
Loop 0 done at: Tue Oct 29 14:32:07 2019
All done at: Tue Oct 29 14:32:07 2019
           

threading子產品

下表是threading子產品中的所有可用對象:

對象 描述
Thread 表示一個執行線程的對象
Lock 鎖原語對象(和thread子產品中的鎖一樣)
RLock 可重入鎖對象,使得單一線程可以(再次)獲得已持有的鎖(遞歸鎖)
Condition 條件變量對象,使得一個線程等待另一個線程滿足特定的‘條件’,比如改變狀态或某個資料值
Event 條件變量的通用版本,任意數量的線程等待某個事件的發生,在該事件發生後所有線程将被激活
Semaphore 為線程間共享的有限資源提供了一個“計數器”,如果沒有可用資源時會被阻塞
BoundedSemaphore 與Semaphore相似,不過它不允許超過初始值
Timer 與Thread相似,不過他要在運作前等待一段時間
Barrier 建立一個“障礙”,必須達到指定數量的線程後才可以繼續

守護線程:守護線程一般是等待用戶端請求服務的伺服器,無請求時什麼都不做,是以預設守護線程是不重要的,在關閉主線程時預設直接關閉守護線程。是以如果主線程在準備退出時,不需要等待一些子線程完成,就可以為這些子線程設定守護線程标記,該标記為真,表示該線程不重要。設定方法是:

thread.daemon=True

(調用

thread.setDamon(True)

的舊方法已經棄用)。檢查一個線程是否的守護狀态也是檢查這個屬性(調用

thread.isDaemon()

的舊方法已經棄用)。

整個python程式(可以解讀為:主線程)将在所有非守護線程退出後才退出。

  1. Thread類

    Thread類是threading子產品的主要執行對象。她有thread子產品中沒有的很多函數。

    下表給出了Thread對象的屬性和方法:

屬性 描述
name 線程名
ident 線程的辨別符
daemon 守護線程标記
方法 描述
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 初始化方法,執行個體化一個線程對象,需要有一個可調用的target,以及其參數args或/和kwargs。還可以傳遞name和group參數(後者還未實作)。此外。而daemon的值将會設定`thread.daemon屬性/标志
start() 開始執行該線程
run() 定義該線程功能的方法(通常在子類中被重寫)
join(timeout=None) 直至啟動的線程終止之前一直被挂起;除非給出了timeout(秒),否則會一直阻塞
getName() 傳回線程名(已棄用,可直接調用thread.name屬性)
setName(name) 設定線程名(已棄用,可直接修改thread.name屬性
is_alive() 傳回布爾值,表示這個程序是否還存活
isDaemon() 如果是守護程序,傳回True;否則,傳回False(已棄用)
setDaemon(daemonic) 把線程的守護标志設定為布爾值daemonic(必須線上程start()之前調用)(已棄用)

下面從三個方面說明如何使用Thread類:

  • 建立Thread的執行個體,傳給它一個函數。
  • 建立Thread的執行個體,傳給它一個可調用的類執行個體。
  • 派生Thread的子類,并建立子類的執行個體。

    最推薦最後一種方案。最不推薦第二種。

建立Thread的執行個體,傳給它一個函數

mtsleepB.py

改寫為

mtsleepC.py

,添加使用Thread類,代碼如下:

# mtsleepC.py

import threading
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())

def main():
    print("Start at:", ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)


    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

執行結果如下:

Start at: Tue Oct 29 15:50:11 2019
Start loop 0 at: Tue Oct 29 15:50:11 2019
Start loop 1 at: Tue Oct 29 15:50:11 2019
Loop 1 done at: Tue Oct 29 15:50:13 2019
Loop 0 done at: Tue Oct 29 15:50:15 2019
All done at: Tue Oct 29 15:50:15 2019
           
  • 使用thread子產品時實作的鎖沒有了,取而代之的是一組Thread對象。
  • 執行個體化Thread對象(調用

    Thread()

    )和調用

    thread.start_new_thread()

    的最大差別是新縣城不會立即開始執行,便于線程的同步。當所有線程配置設定完成後,再調用每個線程的

    start()

    方法讓線程開始執行。
  • 相對于管理一組鎖(配置設定、擷取、釋放、檢查鎖狀态等),這裡隻需要為每個線程調用

    join()

    方法即可。

    join()

    方法将等待線程結束,或者在提供了逾時時間的情況下,大道逾時時間。
  • 實際上

    join()

    方法根本不需要調用,一旦線程啟動,它們就會一直執行,直到給定的函數完成後後退出。如果主線程還有其他工作要做,就可以不調用

    join()

    join()

    方法隻有在你需要等待線程完成的時候才有用。如果把第26~27行注釋掉,執行結果如下:
Start at: Tue Oct 29 15:52:42 2019
Start loop 0 at: Tue Oct 29 15:52:42 2019
Start loop 1 at: Tue Oct 29 15:52:42 2019
All done at: Tue Oct 29 15:52:42 2019
Loop 1 done at: Tue Oct 29 15:52:44 2019
Loop 0 done at: Tue Oct 29 15:52:46 2019
           

從結果可看出

join()

的用法。

建立Thread的執行個體,傳給它一個可調用的類執行個體

mtsleepC.py

的基礎上新增加一個新類ThreadFunc,并進行一些其他的輕微改動,得到

mtsleepD.py

,代碼如下:

# mtsleepD.py

import threading
from time import sleep, ctime

loops = [4, 2]

class ThreadFunc():
    '''可調用類'''
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)

def loop(nloop, nsec):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())

def main():
    print("Start at:", ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)


    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

執行結果如下:

Start at: Tue Oct 29 16:11:40 2019
Start loop 0 at: Tue Oct 29 16:11:40 2019
Start loop 1 at: Tue Oct 29 16:11:40 2019
Loop 1 done at: Tue Oct 29 16:11:42 2019
Loop 0 done at: Tue Oct 29 16:11:44 2019
All done at: Tue Oct 29 16:11:44 2019
           

派生Thread的子類,并建立子類的執行個體

相對于建立可調用類的例子,當建立線程時使用子類要相對更容易閱讀。下面是

mtsleepE.py

的代碼:

# mtsleepE.py

import threading
from time import sleep, ctime

loops = [4, 2]

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        super().__init__()
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

def loop(nloop, nsec):
    print("Start loop %d at:" % nloop, ctime())
    sleep(nsec)
    print("Loop %d done at:" % nloop, ctime())

def main():
    print("Start at:", ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("All done at:", ctime())

if __name__ == '__main__':
    main()
           

執行結果:

Start at: Tue Oct 29 16:21:04 2019
Start loop 0 at: Tue Oct 29 16:21:04 2019
Start loop 1 at: Tue Oct 29 16:21:04 2019
Loop 1 done at: Tue Oct 29 16:21:06 2019
Loop 0 done at: Tue Oct 29 16:21:08 2019
All done at: Tue Oct 29 16:21:08 2019
           

如果執行對象有傳回值,可在MyThread類中增加

get_result()

方法來取得傳回值,這是MyThread類的代碼如下:

import threading

class MyThread(threading.Thread):
	def __init__(self, func, args, name='')
		super().__init__()
		self.name = name
		self.func = func
		self.args = args

	def run(self):
		self.result = self.func(*self.args)

	def get_result(self):
		try:
			return self.result
		except Exception as e:
			return None
           
  1. threading子產品的其他函數

    除了各種同步和線程對象外,threading子產品還提供了一些函數,如下表:

函數 描述
active_count() 目前活動的Thread對象個數
current_count() 傳回目前的Thread對象
enumerate() 傳回目前活動的Thread對象清單
settrace(func) 為所有線程設定一個trace函數
setprofile(func) 為所有線程設定一個profile函數
stack_size(size=0) 傳回新建立線程的棧大小;或為後續建立的線程設定棧的大小為size

同步原語

一般在多線程代碼中,總會有一些特定的函數或代碼塊不希望(或不應該)被多個線程同時執行,通常包括修改資料庫、更新檔案或其他會産生競态條件的類似情況。這就是需要使用同步的情況。這裡介紹兩種類型的同步原語:鎖/互斥;信号量。

  1. 鎖示例

    鎖有兩種狀态:鎖定和未鎖定。

    隻支援兩種函數:獲得鎖和釋放鎖。

    下面是

    mtsleepF.py

    沒有使用鎖的代碼:
# mtsleepF.py

from atexit import register
from random import randrange
from threading import Thread, currentThread
from time import sleep, ctime

class CleanOutputSet(set):
    def __str__(self):
        return ','.join(x for x in self)

loops = (randrange(2, 5) for x in range(randrange(3, 20))) #  将清單生成式的[]改為(),就建立了一個generator。   randrange(3, 7)表示建立3~6個程序,randrange(2,5)表示為每一個程序選擇一個睡眠時間
remaining = CleanOutputSet()

def loop(nsec):
    myname = currentThread().name
    remaining.add(myname)
    print('[%s] started %s' % (ctime(), myname))
    sleep(nsec)
    remaining.remove(myname)
    print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
    print('  (remaining: %s)' % (remaining or 'None'))

def main():
    for pause in loops:
        Thread(target=loop, args=(pause, )).start() #  args=(pause, )中的','必不可少,如果沒有,那麼args就會被當做int

@register
def _atexit():
    print('all Done at:', ctime())

if __name__ == '__main__':
    main()
           

我們來解釋上述代碼:

第1~6行

  1. atexit子產品及其

    register()

    函數用法參考https://blog.csdn.net/qyhaill/article/details/102807732
  2. random子產品的

    randrange()

    函數用法參考https://www.runoob.com/python/func-number-randrange.html
  3. threading.currentThread()傳回目前線程變量,這是舊的寫法,新的寫法是threading.current_thread()

第8~10行

  1. 派生python的set類(set類介紹百度很多),實作了它的

    __str__()

    方法
  2. join()

    函數用法參考https://www.runoob.com/python/att-string-join.html

第12~22行

  1. loops是一個generator。将清單生成式的[]改為(),就建立了一個generator
  2. remaining是set類的派生類的對象,用來記錄剩下的還在運作的線程
  3. loop()

    函數進行一些修改,在sleep完之後将remaining中剩下的還在執行的線程顯示出來

第24~30行

  1. main()函數建立并開始執行線程
  2. 28~30行的意思參見atexit子產品的用法

幸運的話,執行結果會按适當的順序給出:

[Tue Oct 29 22:53:28 2019] started Thread-1
[Tue Oct 29 22:53:28 2019] started Thread-2
[Tue Oct 29 22:53:28 2019] started Thread-3
[Tue Oct 29 22:53:28 2019] started Thread-4
[Tue Oct 29 22:53:28 2019] started Thread-5
[Tue Oct 29 22:53:28 2019] started Thread-6
[Tue Oct 29 22:53:28 2019] started Thread-7
[Tue Oct 29 22:53:30 2019] completed Thread-2 (2 secs)
  (remaining: Thread-7,Thread-6,Thread-4,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-4 (2 secs)
  (remaining: Thread-7,Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-7 (2 secs)
  (remaining: Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:31 2019] completed Thread-3 (3 secs)
  (remaining: Thread-6,Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-6 (4 secs)
  (remaining: Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-5 (4 secs)
  (remaining: Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-1 (4 secs)
  (remaining: None)
all Done at: Tue Oct 29 22:53:32 2019
           

不幸運的話,會得到下面這樣奇怪的輸出:

[Tue Oct 29 22:53:22 2019] started Thread-1
[Tue Oct 29 22:53:22 2019] started Thread-2
[Tue Oct 29 22:53:22 2019] started Thread-3
[Tue Oct 29 22:53:22 2019] started Thread-4
[Tue Oct 29 22:53:22 2019] started Thread-5
[Tue Oct 29 22:53:22 2019] started Thread-6
[Tue Oct 29 22:53:22 2019] started Thread-7
[Tue Oct 29 22:53:22 2019] started Thread-8
[Tue Oct 29 22:53:22 2019] started Thread-9
[Tue Oct 29 22:53:22 2019] started Thread-10
[Tue Oct 29 22:53:22 2019] started Thread-11
[Tue Oct 29 22:53:22 2019] started Thread-12
[Tue Oct 29 22:53:22 2019] started Thread-13
[Tue Oct 29 22:53:22 2019] started Thread-14
[Tue Oct 29 22:53:22 2019] started Thread-15
[Tue Oct 29 22:53:22 2019] started Thread-16
[Tue Oct 29 22:53:24 2019] completed Thread-3 (2 secs)
  (remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7,Thread-2)
[Tue Oct 29 22:53:24 2019] completed Thread-2 (2 secs)
  (remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-8 (2 secs)
  (remaining: Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-10 (2 secs)
  (remaining: Thread-6,Thread-9,Thread-14,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:25 2019] completed Thread-11 (3 secs)
  (remaining: Thread-6,Thread-9,Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:26 2019] completed Thread-6 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-7 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-4 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-9 (4 secs)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
[Tue Oct 29 22:53:26 2019] completed Thread-1 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-5 (4 secs)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
  (remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
[Tue Oct 29 22:53:26 2019] completed Thread-13 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-12 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-15 (4 secs)
  (remaining: None)
  (remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-16 (4 secs)
  (remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-14 (4 secs)
  (remaining: None)
  (remaining: None)
all Done at: Tue Oct 29 22:53:26 2019
           

存在兩個問題:

  1. 由于多個線程執行進度的不同,可能會出現交替輸出的問題(I/O)
  2. 可能出現多個線程同時改變同一個變量的情況

mtsleepF.py

改寫為

mtsleepG.py

,增加鎖機制,代碼如下:

# mtsleepG.py

from atexit import register
from random import randrange
from threading import Thread, current_thread, Lock
from time import sleep, ctime

class CleanOutputSet(set):
    def __str__(self):
        return ','.join(x for x in self)

lock = Lock()
loops = (randrange(2, 5) for x in range(randrange(3, 20))) #  将清單生成式的[]改為(),就建立了一個generator。   randrange(3, 7)表示建立3~6個程序,randrange(2,5)表示為每一個程序選擇一個睡眠時間
remaining = CleanOutputSet()

def loop(nsec):
    myname = current_thread().name
    lock.acquire()
    remaining.add(myname)
    print('[%s] started %s' % (ctime(), myname))
    lock.release()
    sleep(nsec)
    lock.acquire()
    remaining.remove(myname)
    print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
    print('  (remaining: %s)' % (remaining or 'None'))
    lock.release()

def main():
    for pause in loops:
        Thread(target=loop, args=(pause, )).start() #  args=(pause, )中的','必不可少,如果沒有,那麼args就會被當做int

@register
def _atexit():
    print('all Done at:', ctime())

if __name__ == '__main__':
    main()
           

代碼容易了解,不再過多解釋。不過使用with語句可以代替acquire()和release(),使代碼更簡潔優化,可将loop()函數改為:

from threading import Thread, current_thread, Lock
from time import sleep, ctime

def loop(nsec):
    myname = current_thread().name
    with lock:
        remaining.add(myname)
        print('[%s] started %s' % (ctime(), myname))
    sleep(nsec)
    with lock:
        remaining.remove(myname)
        print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
        print('  (remaining: %s)' % (remaining or 'None'))
           
  1. 信号量示例

    pass

python多程序

這裡隻講述multiprocessing子產品。os子產品中的

fork()

函數也能實作多程序,功能太弱。

Process類

multiprocessing子產品和threading子產品非常相似,其中最主要的類是Process類,和Thread類的API相同:

屬性 描述
name 程序名,僅用于識别目的,沒有語義
pid 程序ID。在生成程序前為

None

daemon 守護程序标記(必須在start()被調用前設定)
exitcode 子程序的退出代碼。如果子程序尚未終止則為

None

;負值 − N -N −N表示子程序被信号 N N N終止
authkey 程序的身份驗證密鑰(bytes)
sentinel 系統對象的數字句柄,當程序結束時将變為"ready"
方法 描述
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 初始化方法,執行個體化一個程序對象,需要有一個可調用的target,以及其參數args或/和kwargs。還可以傳遞name和group參數(後者僅用于相容threading.Thread)。此外。而daemon的值将會設定`process.daemon屬性/标志
start() 啟動程序活動
run() 定義該程序活動的方法(通常在子類中被重寫)
join(timeout=None) 直至啟動的程序終止之前一直被挂起;除非給出了timeout(秒),否則會一直阻塞(程序無法join自身,否則會死鎖
is_alive() 傳回布爾值,表示這個程序是否還存活
terminate() 強制終止程序且不做清理。程序的後代程序将不會被終止,變成僵屍程序;如果有鎖沒有釋放,将變成死鎖
kill() 與terminate()功能相同
close() 關閉Process對象,釋放與之關聯的所有資源。如果底層程序仍在運作,會引起ValueError

對應于Thread類,下面從三個方面說明如何使用Process類:

  • 建立Process的執行個體,傳給它一個函數。
  • 建立Process的執行個體,傳給它一個可調用的類執行個體。
  • 派生Process的子類,并建立子類的執行個體。

    最推薦最後一種方案。最不推薦第二種。

建立Process的執行個體,傳給它一個函數

mtsleepC.py

改寫為

mpsleepC.py

,将Thread類換為Process類,代碼如下:

# mpsleepC.py

from multiprocessing import Process
from time import sleep, ctime
import os

loops = [4, 2]

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    processes = []
    nloops = range(len(loops))

    for i in nloops:
        t = Process(target=loop, args=(loops[i], ))
        processes.append(t)


    for i in nloops:
        processes[i].start()

    for i in nloops:
        processes[i].join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

執行結果:

[Wed Oct 30 09:45:57 2019] 15513 main process start ... ...
[Wed Oct 30 09:45:57 2019] 15514 start.
[Wed Oct 30 09:45:57 2019] 15515 start.
[Wed Oct 30 09:45:59 2019] 15515 end.
[Wed Oct 30 09:46:01 2019] 15514 end.
[Wed Oct 30 09:46:01 2019] 15513 main process end ... ...
           

其中,

os.getpid()

函數用于擷取目前程序号。

建立Process的執行個體,傳給它一個可調用的類執行個體

mtsleepD.py

改寫為

mpsleepD.py

,代碼如下:

# mpsleepD.py

import multiprocessing
import os
from time import sleep, ctime

loops = [4, 2]

class ProcessFunc():
    '''可調用類'''
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    processes = []
    nloops = range(len(loops))

    for i in nloops:
        t = multiprocessing.Process(target=ProcessFunc(loop, (loops[i], ), loop.__name__))
        processes.append(t)


    for i in nloops:
        processes[i].start()

    for i in nloops:
        processes[i].join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

執行結果如下:

[Wed Oct 30 09:50:58 2019] 16023 main process start ... ...
[Wed Oct 30 09:50:58 2019] 16024 start.
[Wed Oct 30 09:50:58 2019] 16026 start.
[Wed Oct 30 09:51:00 2019] 16026 end.
[Wed Oct 30 09:51:02 2019] 16024 end.
[Wed Oct 30 09:51:02 2019] 16023 main process end ... ...
           

派生Process的子類,并建立子類的執行個體

mtsleepE.py

改寫為

mpsleepE.py

,代碼如下:

# mpsleepE.py

import multiprocessing
import os
from time import sleep, ctime

loops = [4, 2]

class MyProcess(multiprocessing.Process):
    def __init__(self, func, args, name=''):
        super().__init__()
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    processes = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyProcess(loop, (loops[i], ), loop.__name__)
        processes.append(t)

    for i in nloops:
        processes[i].start()

    for i in nloops:
        processes[i].join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

執行結果如下:

[Wed Oct 30 09:57:00 2019] 16683 main process start ... ...
[Wed Oct 30 09:57:00 2019] 16684 start.
[Wed Oct 30 09:57:00 2019] 16685 start.
[Wed Oct 30 09:57:02 2019] 16685 end.
[Wed Oct 30 09:57:04 2019] 16684 end.
[Wed Oct 30 09:57:04 2019] 16683 main process end ... ...
           

孤兒程序和僵屍程序

  1. 孤兒程序

    如果父程序終止,而由父程序建立的一個或多個子程序還在執行的話,這一個或多個子程序就會成為孤兒程序,成為孤兒程序後,善後工作(wait()或者waitpid()等)就會由init接管,init程序是核心啟動的第一個程序,pid=1,由0号程序idle建立。應為有人善後,是以孤兒程序是無害的。

  2. 僵屍程序

    由于父程序和子程序是異步的,是以父程序不會知道子程序會在什麼時候結束,是以,為了在子程序結束後讓父程序知道,子程序結束後會保留一部分系統資源如pid,運作時間,退出狀态等。等父程序通過wait()或者waitpid()系統調用取得這些資訊時,這部分資源才會被釋放。但如果父程序一直未調用wait()或者waitpid(),那這些資源就一直不會被釋放,比如pid,pid的數量是有限的,如果僵屍程序過多,就會導緻pid不足而無法建立新程序,是以僵屍程序是有害的。

    下面是示範僵屍程序的

    zombie_process.py

    的代碼:
# zombie_process.py

import multiprocessing
from time import ctime, sleep
import os

def demo():
    print('[%s] %s start.' % (ctime(), os.getpid()))
    sleep(1)
    print('[%s] %s end.' % (ctime(), os.getpid()))

def main():
    print('[%s] %s father process start ... ...' % (ctime(), os.getpid()))
    multiprocessing.Process(target=demo).start()
    sleep(10000)
    print('[%s] %s father process end ... ...' % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

父程序sleep 10000s,而子程序sleep 1s,是以子程序遠早于父程序結束,子程序結束而父程序還未結束時的輸出結果為:

[Wed Oct 30 10:19:23 2019] 18262 father process start ... ...
[Wed Oct 30 10:19:23 2019] 18263 start.
[Wed Oct 30 10:19:24 2019] 18263 end.
           

這是再開一個終端執行:

ps aux | grep zombie_process.py
           

這條指令的作用是檢視所有與

zombie_process.py

有關的系統程序。輸出為:

qyh      18262  0.1  0.0  38332 11124 pts/1    S+   10:19   0:00 python zombie_process.py
qyh      18321  0.0  0.0  21536  1008 pts/3    S+   10:19   0:00 grep --color=auto zombie_process.py
           

可以看到第一個為父程序,占用cpu 0.1%,第二個是子程序,沒有占用cpu和記憶體,就是所謂的僵屍程序。這是在終端執行:

kill 18262
           

殺死父程序,父程序結束後僵屍程序就會變成孤兒程序,由init收養後釋放資源。

Pool類

如果需要建立大量程序,使用Process類管理就會比較麻煩,就可以使用程序池Pool類。Pool類可以提供指定數量的程序供使用者調用,當有新的請求送出到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。

mpsleepC.py

改寫為

mpsleepF.py

用來說明Pool類的用法,代碼如下:

# mpsleepF.py

from multiprocessing import Pool
from time import sleep, ctime
import os

loops = [4, 2, 3, 4, 5, 2, 1]

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main():
    print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
    p = Pool(processes=3)

    rl = p.map(loop, loops)
    p.close()
    p.join()

    print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main()
           

第16行

執行個體化一個程序池對象,傳入的參數processes表示程序池中程序的個數,這裡為3。

第18~20行

  1. pool.map()函數原型如下:

和内置的map()函數用法行為基本一緻,它會使程序阻塞直至傳回結果。

rl表示result_list,是存儲傳回結果的清單。

2. pool.close()函數用于關閉程序池,使其不再接受新的任務。

3. pool.join()主程序阻塞等待程序池中的子程序退出。必須在close()或terminate()之後使用。

執行結果如下:

[Wed Oct 30 15:12:58 2019] 21744 main process start ... ...
[Wed Oct 30 15:12:58 2019] 21745 start.
[Wed Oct 30 15:12:58 2019] 21746 start.
[Wed Oct 30 15:12:58 2019] 21747 start.
[Wed Oct 30 15:13:00 2019] 21746 end.
[Wed Oct 30 15:13:00 2019] 21746 start.
[Wed Oct 30 15:13:01 2019] 21747 end.
[Wed Oct 30 15:13:01 2019] 21747 start.
[Wed Oct 30 15:13:02 2019] 21745 end.
[Wed Oct 30 15:13:02 2019] 21745 start.
[Wed Oct 30 15:13:04 2019] 21746 end.
[Wed Oct 30 15:13:04 2019] 21746 start.
[Wed Oct 30 15:13:04 2019] 21745 end.
[Wed Oct 30 15:13:05 2019] 21746 end.
[Wed Oct 30 15:13:06 2019] 21747 end.
[Wed Oct 30 15:13:06 2019] 21744 main process end ... ...
           

Pool類還有兩個重要的函數,函數原型如下:

apply(func, args=(), kwds={})
apply_async(func, args=(), kwds={}, callback=None)
           

apply()使用args參數極易kwds命名參數調用func,它會在傳回結果前阻塞主程序。這使得它和單程序沒什麼差別。python3.x之後很少用。

apply_async()和apply()用法一樣,但它是非阻塞的,并且支援結果傳回進行回調。

mpsleepF.py

改寫為

mpsleepG.py

,用來比較apply()和apply_async()的用法,代碼如下:

# mpsleepG.py

from multiprocessing import Pool
from time import sleep, ctime
import os

loops = [4, 2, 3, 4, 5, 2, 1]

def loop(nsec):
    print("[%s] %s start." % (ctime(), os.getpid()))
    sleep(nsec)
    print("[%s] %s end." % (ctime(), os.getpid()))

def main1():
    print("[%s] %s main process_1 start ... ..." % (ctime(), os.getpid()))
    p = Pool(processes=3)

    for i in range(len(loops)):
        p.apply(loop, (loops[i], ))

    p.close()
    p.join()

    print("[%s] %s main process_1 end ... ..." % (ctime(), os.getpid()))

def main2():
    print("[%s] %s main process_2 start ... ..." % (ctime(), os.getpid()))
    p = Pool(processes=3)
    result_list = []

    for i in range(len(loops)):
        result_list.append(p.apply_async(loop, (loops[i], )))

    p.close()
    p.join()

    print("[%s] %s main process_2 end ... ..." % (ctime(), os.getpid()))

if __name__ == '__main__':
    main1()
    main2()
           

執行結果如下:

[Wed Oct 30 15:26:28 2019] 22799 main process_1 start ... ...
[Wed Oct 30 15:26:28 2019] 22800 start.
[Wed Oct 30 15:26:32 2019] 22800 end.
[Wed Oct 30 15:26:32 2019] 22801 start.
[Wed Oct 30 15:26:34 2019] 22801 end.
[Wed Oct 30 15:26:34 2019] 22802 start.
[Wed Oct 30 15:26:37 2019] 22802 end.
[Wed Oct 30 15:26:37 2019] 22800 start.
[Wed Oct 30 15:26:41 2019] 22800 end.
[Wed Oct 30 15:26:41 2019] 22801 start.
[Wed Oct 30 15:26:46 2019] 22801 end.
[Wed Oct 30 15:26:46 2019] 22802 start.
[Wed Oct 30 15:26:48 2019] 22802 end.
[Wed Oct 30 15:26:48 2019] 22800 start.
[Wed Oct 30 15:26:49 2019] 22800 end.
[Wed Oct 30 15:26:49 2019] 22799 main process_1 end ... ...
[Wed Oct 30 15:26:49 2019] 22799 main process_2 start ... ...
[Wed Oct 30 15:26:49 2019] 22808 start.
[Wed Oct 30 15:26:49 2019] 22809 start.
[Wed Oct 30 15:26:49 2019] 22810 start.
[Wed Oct 30 15:26:51 2019] 22809 end.
[Wed Oct 30 15:26:51 2019] 22809 start.
[Wed Oct 30 15:26:52 2019] 22810 end.
[Wed Oct 30 15:26:52 2019] 22810 start.
[Wed Oct 30 15:26:53 2019] 22808 end.
[Wed Oct 30 15:26:53 2019] 22808 start.
[Wed Oct 30 15:26:55 2019] 22809 end.
[Wed Oct 30 15:26:55 2019] 22809 start.
[Wed Oct 30 15:26:55 2019] 22808 end.
[Wed Oct 30 15:26:56 2019] 22809 end.
[Wed Oct 30 15:26:57 2019] 22810 end.
[Wed Oct 30 15:26:57 2019] 22799 main process_2 end ... ...
           

main2()中的result_list清單用來存儲傳回值(當然這個例子中的func并沒有傳回值)。

multiprocessing子產品中鎖的用法和threading子產品完全相同,不再重複講解。

程序間通信

pass