本文是筆者學習python多線程和多程序的筆記。
Python多線程
單線程執行sleep
在
onethr.py
檔案中建立兩個時間循環:一個睡眠4s(loop0);一個睡眠2s(loop1)(這裡使用"loop0"和"loop1"作為函數名)。以下多線程的講解示例将以此為基礎。代碼如下:
# onethy.py
from time import sleep, ctime
def loop0():
print("Start loop 0 at:", ctime())
sleep(4)
print("Loop 0 done at:", ctime())
def loop1():
print("Start loop 1 at:", ctime())
sleep(2)
print("Loop 1 done at:", ctime())
def main():
print("Start at:", ctime())
loop0()
loop1()
print("All done at:", ctime())
if __name__ == '__main__':
main()
執行結果如下:
Start at: Tue Oct 29 09:39:25 2019
Start loop 0 at: Tue Oct 29 09:39:25 2019
Loop 0 done at: Tue Oct 29 09:39:29 2019
Start loop 1 at: Tue Oct 29 09:39:29 2019
Loop 1 done at: Tue Oct 29 09:39:31 2019
All done at: Tue Oct 29 09:39:31 2019
多線程程式設計
Python支援多線程程式設計的子產品:
thread子產品:提供了基本的線程和鎖定支援;
threading子產品:提供了更進階别、功能更全面的線程管理;
Queue子產品:用于建立一個隊列資料結構,在多線程之間進行共享。
建議:推薦使用更先進的threading子產品,而不是thread子產品。
注意:在python3中,thread子產品被重命名為_thread子產品。
thread子產品
下表是一些thread子產品常用的線程函數和LockType鎖對象的方法。
thread子產品的函數 | 功能 |
---|---|
start_new_thread(function, args, kwargs=None) | 派生一個新的線程,使用給定的args和可選的kwargs來執行funciton |
allocate_lock() | 配置設定LockType所對象 |
exit() | 給線程退出指令 |
LockType鎖對象的方法 | 功能 |
---|---|
acquire(wait=None) | 嘗試擷取鎖對象 |
locked() | 如果擷取了鎖對象則傳回True,否則,傳回False |
release() | 釋放鎖 |
- 使用thread子產品的多線程機制将
改寫為onethr.py
,代碼如下:mtsleepA.py
# mtsleepA.py
import _thread
from time import sleep, ctime
def loop0():
print("Start loop 0 at:", ctime())
sleep(4)
print("Loop 0 done at:", ctime())
def loop1():
print("Start loop 1 at:", ctime())
sleep(2)
print("Loop 1 done at:", ctime())
def main():
print("Start at:", ctime())
_thread.start_new_thread(loop0, ())
_thread.start_new_thread(loop1, ()) # 第二個參數args是一個tuple,沒有預設值,是以即使function沒有參數,也要傳入一個空tuple
sleep(6)
print("All done at:", ctime())
if __name__ == '__main__':
main()
執行結果如下:
Start at: Tue Oct 29 10:16:05 2019
Start loop 0 at: Tue Oct 29 10:16:05 2019
Start loop 1 at: Tue Oct 29 10:16:05 2019
Loop 1 done at: Tue Oct 29 10:16:07 2019
Loop 0 done at: Tue Oct 29 10:16:09 2019
All done at: Tue Oct 29 10:16:11 2019
可以看出loop0和loop1幾乎同時開始執行,且兩者是并行執行的,整體的運作時間是4s。
代碼中第20行的
sleep(6)
的調用是非常有用的。如果沒有20行,執行了18和19行後,直接顯示"All done",然後主線程結束退出,那麼子線程loop0()和loop1()也會直接終止。
這裡我們知道這個程式最多執行6s,但是實際上隻需要4s,并沒有比單線程節省時間,而且大部分情況我們是不知道需要執行多久,那麼就沒辦法使用sleep()來進行線程的同步機制了。這是就需要鎖了。
2. 使用thread子產品的鎖對象,将
mtsleepA.py
改寫為
mtsleepB.py
,代碼如下:
# mtsleepB.py
import _thread
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec, lock):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
lock.release()
def main():
print("Start at:", ctime())
locks = []
nloops = range(len(loops))
for i in nloops:
lock = _thread.allocate_lock()
lock.acquire()
locks.append(lock)
for i in nloops:
_thread.start_new_thread(loop, (i, loops[i], locks[i]))
for i in nloops:
while locks[i].locked():
pass
print("All done at:", ctime())
if __name__ == '__main__':
main()
下面逐行對代碼進行解釋:
第1~6行
- 導入子產品
- 不再對4s和2s寫死到不同函數中,而是使用了唯一的loop函數,并把這些常量放入清單loops清單中。
第8~12行
- loop()函數代替了之前的loop*()函數。
- 給loop()配置設定一個已獲得的鎖,當執行完後釋放鎖,向主線程表明該子線程已完成。
第14~34行
3. 第一個for循環建立鎖的清單,開啟多少個子線程就配置設定多少個鎖對象(20行),然後通過
acquire()
方法取得(21行)(取得鎖可以了解為“把鎖鎖上”),并把鎖放入清單。
4. 第二個for循環用于派生線程,每個線程都調用loop()函數,并傳遞号碼、睡眠時間以及用于該線程的鎖。
5. 第三個for循環用于檢查鎖清單,看清單裡面的鎖是否全部釋放。隻有在釋放後才會繼續向後執行。
執行結果如下:
Start at: Tue Oct 29 14:32:03 2019
Start loop 0 at: Tue Oct 29 14:32:03 2019
Start loop 1 at: Tue Oct 29 14:32:03 2019
Loop 1 done at: Tue Oct 29 14:32:05 2019
Loop 0 done at: Tue Oct 29 14:32:07 2019
All done at: Tue Oct 29 14:32:07 2019
threading子產品
下表是threading子產品中的所有可用對象:
對象 | 描述 |
---|---|
Thread | 表示一個執行線程的對象 |
Lock | 鎖原語對象(和thread子產品中的鎖一樣) |
RLock | 可重入鎖對象,使得單一線程可以(再次)獲得已持有的鎖(遞歸鎖) |
Condition | 條件變量對象,使得一個線程等待另一個線程滿足特定的‘條件’,比如改變狀态或某個資料值 |
Event | 條件變量的通用版本,任意數量的線程等待某個事件的發生,在該事件發生後所有線程将被激活 |
Semaphore | 為線程間共享的有限資源提供了一個“計數器”,如果沒有可用資源時會被阻塞 |
BoundedSemaphore | 與Semaphore相似,不過它不允許超過初始值 |
Timer | 與Thread相似,不過他要在運作前等待一段時間 |
Barrier | 建立一個“障礙”,必須達到指定數量的線程後才可以繼續 |
守護線程:守護線程一般是等待用戶端請求服務的伺服器,無請求時什麼都不做,是以預設守護線程是不重要的,在關閉主線程時預設直接關閉守護線程。是以如果主線程在準備退出時,不需要等待一些子線程完成,就可以為這些子線程設定守護線程标記,該标記為真,表示該線程不重要。設定方法是:
thread.daemon=True
(調用
thread.setDamon(True)
的舊方法已經棄用)。檢查一個線程是否的守護狀态也是檢查這個屬性(調用
thread.isDaemon()
的舊方法已經棄用)。
整個python程式(可以解讀為:主線程)将在所有非守護線程退出後才退出。
-
Thread類
Thread類是threading子產品的主要執行對象。她有thread子產品中沒有的很多函數。
下表給出了Thread對象的屬性和方法:
屬性 | 描述 |
---|---|
name | 線程名 |
ident | 線程的辨別符 |
daemon | 守護線程标記 |
方法 | 描述 |
---|---|
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) | 初始化方法,執行個體化一個線程對象,需要有一個可調用的target,以及其參數args或/和kwargs。還可以傳遞name和group參數(後者還未實作)。此外。而daemon的值将會設定`thread.daemon屬性/标志 |
start() | 開始執行該線程 |
run() | 定義該線程功能的方法(通常在子類中被重寫) |
join(timeout=None) | 直至啟動的線程終止之前一直被挂起;除非給出了timeout(秒),否則會一直阻塞 |
getName() | 傳回線程名(已棄用,可直接調用thread.name屬性) |
setName(name) | 設定線程名(已棄用,可直接修改thread.name屬性 |
is_alive() | 傳回布爾值,表示這個程序是否還存活 |
isDaemon() | 如果是守護程序,傳回True;否則,傳回False(已棄用) |
setDaemon(daemonic) | 把線程的守護标志設定為布爾值daemonic(必須線上程start()之前調用)(已棄用) |
下面從三個方面說明如何使用Thread類:
- 建立Thread的執行個體,傳給它一個函數。
- 建立Thread的執行個體,傳給它一個可調用的類執行個體。
-
派生Thread的子類,并建立子類的執行個體。
最推薦最後一種方案。最不推薦第二種。
建立Thread的執行個體,傳給它一個函數
把
mtsleepB.py
改寫為
mtsleepC.py
,添加使用Thread類,代碼如下:
# mtsleepC.py
import threading
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
def main():
print("Start at:", ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop, args=(i, loops[i]))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print("All done at:", ctime())
if __name__ == '__main__':
main()
執行結果如下:
Start at: Tue Oct 29 15:50:11 2019
Start loop 0 at: Tue Oct 29 15:50:11 2019
Start loop 1 at: Tue Oct 29 15:50:11 2019
Loop 1 done at: Tue Oct 29 15:50:13 2019
Loop 0 done at: Tue Oct 29 15:50:15 2019
All done at: Tue Oct 29 15:50:15 2019
- 使用thread子產品時實作的鎖沒有了,取而代之的是一組Thread對象。
- 執行個體化Thread對象(調用
)和調用Thread()
的最大差別是新縣城不會立即開始執行,便于線程的同步。當所有線程配置設定完成後,再調用每個線程的thread.start_new_thread()
方法讓線程開始執行。start()
- 相對于管理一組鎖(配置設定、擷取、釋放、檢查鎖狀态等),這裡隻需要為每個線程調用
方法即可。join()
方法将等待線程結束,或者在提供了逾時時間的情況下,大道逾時時間。join()
- 實際上
方法根本不需要調用,一旦線程啟動,它們就會一直執行,直到給定的函數完成後後退出。如果主線程還有其他工作要做,就可以不調用join()
。join()
方法隻有在你需要等待線程完成的時候才有用。如果把第26~27行注釋掉,執行結果如下:join()
Start at: Tue Oct 29 15:52:42 2019
Start loop 0 at: Tue Oct 29 15:52:42 2019
Start loop 1 at: Tue Oct 29 15:52:42 2019
All done at: Tue Oct 29 15:52:42 2019
Loop 1 done at: Tue Oct 29 15:52:44 2019
Loop 0 done at: Tue Oct 29 15:52:46 2019
從結果可看出
join()
的用法。
建立Thread的執行個體,傳給它一個可調用的類執行個體
在
mtsleepC.py
的基礎上新增加一個新類ThreadFunc,并進行一些其他的輕微改動,得到
mtsleepD.py
,代碼如下:
# mtsleepD.py
import threading
from time import sleep, ctime
loops = [4, 2]
class ThreadFunc():
'''可調用類'''
def __init__(self, func, args, name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
self.func(*self.args)
def loop(nloop, nsec):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
def main():
print("Start at:", ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print("All done at:", ctime())
if __name__ == '__main__':
main()
執行結果如下:
Start at: Tue Oct 29 16:11:40 2019
Start loop 0 at: Tue Oct 29 16:11:40 2019
Start loop 1 at: Tue Oct 29 16:11:40 2019
Loop 1 done at: Tue Oct 29 16:11:42 2019
Loop 0 done at: Tue Oct 29 16:11:44 2019
All done at: Tue Oct 29 16:11:44 2019
派生Thread的子類,并建立子類的執行個體
相對于建立可調用類的例子,當建立線程時使用子類要相對更容易閱讀。下面是
mtsleepE.py
的代碼:
# mtsleepE.py
import threading
from time import sleep, ctime
loops = [4, 2]
class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def loop(nloop, nsec):
print("Start loop %d at:" % nloop, ctime())
sleep(nsec)
print("Loop %d done at:" % nloop, ctime())
def main():
print("Start at:", ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop, (i, loops[i]), loop.__name__)
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print("All done at:", ctime())
if __name__ == '__main__':
main()
執行結果:
Start at: Tue Oct 29 16:21:04 2019
Start loop 0 at: Tue Oct 29 16:21:04 2019
Start loop 1 at: Tue Oct 29 16:21:04 2019
Loop 1 done at: Tue Oct 29 16:21:06 2019
Loop 0 done at: Tue Oct 29 16:21:08 2019
All done at: Tue Oct 29 16:21:08 2019
如果執行對象有傳回值,可在MyThread類中增加
get_result()
方法來取得傳回值,這是MyThread類的代碼如下:
import threading
class MyThread(threading.Thread):
def __init__(self, func, args, name='')
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception as e:
return None
-
threading子產品的其他函數
除了各種同步和線程對象外,threading子產品還提供了一些函數,如下表:
函數 | 描述 |
---|---|
active_count() | 目前活動的Thread對象個數 |
current_count() | 傳回目前的Thread對象 |
enumerate() | 傳回目前活動的Thread對象清單 |
settrace(func) | 為所有線程設定一個trace函數 |
setprofile(func) | 為所有線程設定一個profile函數 |
stack_size(size=0) | 傳回新建立線程的棧大小;或為後續建立的線程設定棧的大小為size |
同步原語
一般在多線程代碼中,總會有一些特定的函數或代碼塊不希望(或不應該)被多個線程同時執行,通常包括修改資料庫、更新檔案或其他會産生競态條件的類似情況。這就是需要使用同步的情況。這裡介紹兩種類型的同步原語:鎖/互斥;信号量。
-
鎖示例
鎖有兩種狀态:鎖定和未鎖定。
隻支援兩種函數:獲得鎖和釋放鎖。
下面是
沒有使用鎖的代碼:mtsleepF.py
# mtsleepF.py
from atexit import register
from random import randrange
from threading import Thread, currentThread
from time import sleep, ctime
class CleanOutputSet(set):
def __str__(self):
return ','.join(x for x in self)
loops = (randrange(2, 5) for x in range(randrange(3, 20))) # 将清單生成式的[]改為(),就建立了一個generator。 randrange(3, 7)表示建立3~6個程序,randrange(2,5)表示為每一個程序選擇一個睡眠時間
remaining = CleanOutputSet()
def loop(nsec):
myname = currentThread().name
remaining.add(myname)
print('[%s] started %s' % (ctime(), myname))
sleep(nsec)
remaining.remove(myname)
print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'None'))
def main():
for pause in loops:
Thread(target=loop, args=(pause, )).start() # args=(pause, )中的','必不可少,如果沒有,那麼args就會被當做int
@register
def _atexit():
print('all Done at:', ctime())
if __name__ == '__main__':
main()
我們來解釋上述代碼:
第1~6行
- atexit子產品及其
函數用法參考https://blog.csdn.net/qyhaill/article/details/102807732register()
- random子產品的
函數用法參考https://www.runoob.com/python/func-number-randrange.htmlrandrange()
- threading.currentThread()傳回目前線程變量,這是舊的寫法,新的寫法是threading.current_thread()
第8~10行
- 派生python的set類(set類介紹百度很多),實作了它的
方法__str__()
-
函數用法參考https://www.runoob.com/python/att-string-join.htmljoin()
第12~22行
- loops是一個generator。将清單生成式的[]改為(),就建立了一個generator
- remaining是set類的派生類的對象,用來記錄剩下的還在運作的線程
- 對
函數進行一些修改,在sleep完之後将remaining中剩下的還在執行的線程顯示出來loop()
第24~30行
- main()函數建立并開始執行線程
- 28~30行的意思參見atexit子產品的用法
幸運的話,執行結果會按适當的順序給出:
[Tue Oct 29 22:53:28 2019] started Thread-1
[Tue Oct 29 22:53:28 2019] started Thread-2
[Tue Oct 29 22:53:28 2019] started Thread-3
[Tue Oct 29 22:53:28 2019] started Thread-4
[Tue Oct 29 22:53:28 2019] started Thread-5
[Tue Oct 29 22:53:28 2019] started Thread-6
[Tue Oct 29 22:53:28 2019] started Thread-7
[Tue Oct 29 22:53:30 2019] completed Thread-2 (2 secs)
(remaining: Thread-7,Thread-6,Thread-4,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-4 (2 secs)
(remaining: Thread-7,Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:30 2019] completed Thread-7 (2 secs)
(remaining: Thread-6,Thread-5,Thread-3,Thread-1)
[Tue Oct 29 22:53:31 2019] completed Thread-3 (3 secs)
(remaining: Thread-6,Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-6 (4 secs)
(remaining: Thread-5,Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-5 (4 secs)
(remaining: Thread-1)
[Tue Oct 29 22:53:32 2019] completed Thread-1 (4 secs)
(remaining: None)
all Done at: Tue Oct 29 22:53:32 2019
不幸運的話,會得到下面這樣奇怪的輸出:
[Tue Oct 29 22:53:22 2019] started Thread-1
[Tue Oct 29 22:53:22 2019] started Thread-2
[Tue Oct 29 22:53:22 2019] started Thread-3
[Tue Oct 29 22:53:22 2019] started Thread-4
[Tue Oct 29 22:53:22 2019] started Thread-5
[Tue Oct 29 22:53:22 2019] started Thread-6
[Tue Oct 29 22:53:22 2019] started Thread-7
[Tue Oct 29 22:53:22 2019] started Thread-8
[Tue Oct 29 22:53:22 2019] started Thread-9
[Tue Oct 29 22:53:22 2019] started Thread-10
[Tue Oct 29 22:53:22 2019] started Thread-11
[Tue Oct 29 22:53:22 2019] started Thread-12
[Tue Oct 29 22:53:22 2019] started Thread-13
[Tue Oct 29 22:53:22 2019] started Thread-14
[Tue Oct 29 22:53:22 2019] started Thread-15
[Tue Oct 29 22:53:22 2019] started Thread-16
[Tue Oct 29 22:53:24 2019] completed Thread-3 (2 secs)
(remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7,Thread-2)
[Tue Oct 29 22:53:24 2019] completed Thread-2 (2 secs)
(remaining: Thread-8,Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-8 (2 secs)
(remaining: Thread-6,Thread-9,Thread-14,Thread-10,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:24 2019] completed Thread-10 (2 secs)
(remaining: Thread-6,Thread-9,Thread-14,Thread-11,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:25 2019] completed Thread-11 (3 secs)
(remaining: Thread-6,Thread-9,Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-4,Thread-16,Thread-5,Thread-7)
[Tue Oct 29 22:53:26 2019] completed Thread-6 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-7 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-4 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-9 (4 secs)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
(remaining: Thread-14,Thread-12,Thread-13,Thread-1,Thread-15,Thread-16,Thread-5)
[Tue Oct 29 22:53:26 2019] completed Thread-1 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-5 (4 secs)
(remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
(remaining: Thread-14,Thread-12,Thread-13,Thread-15,Thread-16)
[Tue Oct 29 22:53:26 2019] completed Thread-13 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-12 (4 secs)
[Tue Oct 29 22:53:26 2019] completed Thread-15 (4 secs)
(remaining: None)
(remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-16 (4 secs)
(remaining: None)
[Tue Oct 29 22:53:26 2019] completed Thread-14 (4 secs)
(remaining: None)
(remaining: None)
all Done at: Tue Oct 29 22:53:26 2019
存在兩個問題:
- 由于多個線程執行進度的不同,可能會出現交替輸出的問題(I/O)
- 可能出現多個線程同時改變同一個變量的情況
将
mtsleepF.py
改寫為
mtsleepG.py
,增加鎖機制,代碼如下:
# mtsleepG.py
from atexit import register
from random import randrange
from threading import Thread, current_thread, Lock
from time import sleep, ctime
class CleanOutputSet(set):
def __str__(self):
return ','.join(x for x in self)
lock = Lock()
loops = (randrange(2, 5) for x in range(randrange(3, 20))) # 将清單生成式的[]改為(),就建立了一個generator。 randrange(3, 7)表示建立3~6個程序,randrange(2,5)表示為每一個程序選擇一個睡眠時間
remaining = CleanOutputSet()
def loop(nsec):
myname = current_thread().name
lock.acquire()
remaining.add(myname)
print('[%s] started %s' % (ctime(), myname))
lock.release()
sleep(nsec)
lock.acquire()
remaining.remove(myname)
print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'None'))
lock.release()
def main():
for pause in loops:
Thread(target=loop, args=(pause, )).start() # args=(pause, )中的','必不可少,如果沒有,那麼args就會被當做int
@register
def _atexit():
print('all Done at:', ctime())
if __name__ == '__main__':
main()
代碼容易了解,不再過多解釋。不過使用with語句可以代替acquire()和release(),使代碼更簡潔優化,可将loop()函數改為:
from threading import Thread, current_thread, Lock
from time import sleep, ctime
def loop(nsec):
myname = current_thread().name
with lock:
remaining.add(myname)
print('[%s] started %s' % (ctime(), myname))
sleep(nsec)
with lock:
remaining.remove(myname)
print('[%s] completed %s (%d secs)' % (ctime(), myname, nsec))
print(' (remaining: %s)' % (remaining or 'None'))
-
信号量示例
pass
python多程序
這裡隻講述multiprocessing子產品。os子產品中的
fork()
函數也能實作多程序,功能太弱。
Process類
multiprocessing子產品和threading子產品非常相似,其中最主要的類是Process類,和Thread類的API相同:
屬性 | 描述 |
---|---|
name | 程序名,僅用于識别目的,沒有語義 |
pid | 程序ID。在生成程序前為 |
daemon | 守護程序标記(必須在start()被調用前設定) |
exitcode | 子程序的退出代碼。如果子程序尚未終止則為 ;負值 − N -N −N表示子程序被信号 N N N終止 |
authkey | 程序的身份驗證密鑰(bytes) |
sentinel | 系統對象的數字句柄,當程序結束時将變為"ready" |
方法 | 描述 |
---|---|
init(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) | 初始化方法,執行個體化一個程序對象,需要有一個可調用的target,以及其參數args或/和kwargs。還可以傳遞name和group參數(後者僅用于相容threading.Thread)。此外。而daemon的值将會設定`process.daemon屬性/标志 |
start() | 啟動程序活動 |
run() | 定義該程序活動的方法(通常在子類中被重寫) |
join(timeout=None) | 直至啟動的程序終止之前一直被挂起;除非給出了timeout(秒),否則會一直阻塞(程序無法join自身,否則會死鎖 |
is_alive() | 傳回布爾值,表示這個程序是否還存活 |
terminate() | 強制終止程序且不做清理。程序的後代程序将不會被終止,變成僵屍程序;如果有鎖沒有釋放,将變成死鎖 |
kill() | 與terminate()功能相同 |
close() | 關閉Process對象,釋放與之關聯的所有資源。如果底層程序仍在運作,會引起ValueError |
對應于Thread類,下面從三個方面說明如何使用Process類:
- 建立Process的執行個體,傳給它一個函數。
- 建立Process的執行個體,傳給它一個可調用的類執行個體。
-
派生Process的子類,并建立子類的執行個體。
最推薦最後一種方案。最不推薦第二種。
建立Process的執行個體,傳給它一個函數
将
mtsleepC.py
改寫為
mpsleepC.py
,将Thread類換為Process類,代碼如下:
# mpsleepC.py
from multiprocessing import Process
from time import sleep, ctime
import os
loops = [4, 2]
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
processes = []
nloops = range(len(loops))
for i in nloops:
t = Process(target=loop, args=(loops[i], ))
processes.append(t)
for i in nloops:
processes[i].start()
for i in nloops:
processes[i].join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
執行結果:
[Wed Oct 30 09:45:57 2019] 15513 main process start ... ...
[Wed Oct 30 09:45:57 2019] 15514 start.
[Wed Oct 30 09:45:57 2019] 15515 start.
[Wed Oct 30 09:45:59 2019] 15515 end.
[Wed Oct 30 09:46:01 2019] 15514 end.
[Wed Oct 30 09:46:01 2019] 15513 main process end ... ...
其中,
os.getpid()
函數用于擷取目前程序号。
建立Process的執行個體,傳給它一個可調用的類執行個體
将
mtsleepD.py
改寫為
mpsleepD.py
,代碼如下:
# mpsleepD.py
import multiprocessing
import os
from time import sleep, ctime
loops = [4, 2]
class ProcessFunc():
'''可調用類'''
def __init__(self, func, args, name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
self.func(*self.args)
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
processes = []
nloops = range(len(loops))
for i in nloops:
t = multiprocessing.Process(target=ProcessFunc(loop, (loops[i], ), loop.__name__))
processes.append(t)
for i in nloops:
processes[i].start()
for i in nloops:
processes[i].join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
執行結果如下:
[Wed Oct 30 09:50:58 2019] 16023 main process start ... ...
[Wed Oct 30 09:50:58 2019] 16024 start.
[Wed Oct 30 09:50:58 2019] 16026 start.
[Wed Oct 30 09:51:00 2019] 16026 end.
[Wed Oct 30 09:51:02 2019] 16024 end.
[Wed Oct 30 09:51:02 2019] 16023 main process end ... ...
派生Process的子類,并建立子類的執行個體
将
mtsleepE.py
改寫為
mpsleepE.py
,代碼如下:
# mpsleepE.py
import multiprocessing
import os
from time import sleep, ctime
loops = [4, 2]
class MyProcess(multiprocessing.Process):
def __init__(self, func, args, name=''):
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
processes = []
nloops = range(len(loops))
for i in nloops:
t = MyProcess(loop, (loops[i], ), loop.__name__)
processes.append(t)
for i in nloops:
processes[i].start()
for i in nloops:
processes[i].join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
執行結果如下:
[Wed Oct 30 09:57:00 2019] 16683 main process start ... ...
[Wed Oct 30 09:57:00 2019] 16684 start.
[Wed Oct 30 09:57:00 2019] 16685 start.
[Wed Oct 30 09:57:02 2019] 16685 end.
[Wed Oct 30 09:57:04 2019] 16684 end.
[Wed Oct 30 09:57:04 2019] 16683 main process end ... ...
孤兒程序和僵屍程序
-
孤兒程序
如果父程序終止,而由父程序建立的一個或多個子程序還在執行的話,這一個或多個子程序就會成為孤兒程序,成為孤兒程序後,善後工作(wait()或者waitpid()等)就會由init接管,init程序是核心啟動的第一個程序,pid=1,由0号程序idle建立。應為有人善後,是以孤兒程序是無害的。
-
僵屍程序
由于父程序和子程序是異步的,是以父程序不會知道子程序會在什麼時候結束,是以,為了在子程序結束後讓父程序知道,子程序結束後會保留一部分系統資源如pid,運作時間,退出狀态等。等父程序通過wait()或者waitpid()系統調用取得這些資訊時,這部分資源才會被釋放。但如果父程序一直未調用wait()或者waitpid(),那這些資源就一直不會被釋放,比如pid,pid的數量是有限的,如果僵屍程序過多,就會導緻pid不足而無法建立新程序,是以僵屍程序是有害的。
下面是示範僵屍程序的
的代碼:zombie_process.py
# zombie_process.py
import multiprocessing
from time import ctime, sleep
import os
def demo():
print('[%s] %s start.' % (ctime(), os.getpid()))
sleep(1)
print('[%s] %s end.' % (ctime(), os.getpid()))
def main():
print('[%s] %s father process start ... ...' % (ctime(), os.getpid()))
multiprocessing.Process(target=demo).start()
sleep(10000)
print('[%s] %s father process end ... ...' % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
父程序sleep 10000s,而子程序sleep 1s,是以子程序遠早于父程序結束,子程序結束而父程序還未結束時的輸出結果為:
[Wed Oct 30 10:19:23 2019] 18262 father process start ... ...
[Wed Oct 30 10:19:23 2019] 18263 start.
[Wed Oct 30 10:19:24 2019] 18263 end.
這是再開一個終端執行:
ps aux | grep zombie_process.py
這條指令的作用是檢視所有與
zombie_process.py
有關的系統程序。輸出為:
qyh 18262 0.1 0.0 38332 11124 pts/1 S+ 10:19 0:00 python zombie_process.py
qyh 18321 0.0 0.0 21536 1008 pts/3 S+ 10:19 0:00 grep --color=auto zombie_process.py
可以看到第一個為父程序,占用cpu 0.1%,第二個是子程序,沒有占用cpu和記憶體,就是所謂的僵屍程序。這是在終端執行:
kill 18262
殺死父程序,父程序結束後僵屍程序就會變成孤兒程序,由init收養後釋放資源。
Pool類
如果需要建立大量程序,使用Process類管理就會比較麻煩,就可以使用程序池Pool類。Pool類可以提供指定數量的程序供使用者調用,當有新的請求送出到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。
将
mpsleepC.py
改寫為
mpsleepF.py
用來說明Pool類的用法,代碼如下:
# mpsleepF.py
from multiprocessing import Pool
from time import sleep, ctime
import os
loops = [4, 2, 3, 4, 5, 2, 1]
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main():
print("[%s] %s main process start ... ..." % (ctime(), os.getpid()))
p = Pool(processes=3)
rl = p.map(loop, loops)
p.close()
p.join()
print("[%s] %s main process end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main()
第16行
執行個體化一個程序池對象,傳入的參數processes表示程序池中程序的個數,這裡為3。
第18~20行
- pool.map()函數原型如下:
和内置的map()函數用法行為基本一緻,它會使程序阻塞直至傳回結果。
rl表示result_list,是存儲傳回結果的清單。
2. pool.close()函數用于關閉程序池,使其不再接受新的任務。
3. pool.join()主程序阻塞等待程序池中的子程序退出。必須在close()或terminate()之後使用。
執行結果如下:
[Wed Oct 30 15:12:58 2019] 21744 main process start ... ...
[Wed Oct 30 15:12:58 2019] 21745 start.
[Wed Oct 30 15:12:58 2019] 21746 start.
[Wed Oct 30 15:12:58 2019] 21747 start.
[Wed Oct 30 15:13:00 2019] 21746 end.
[Wed Oct 30 15:13:00 2019] 21746 start.
[Wed Oct 30 15:13:01 2019] 21747 end.
[Wed Oct 30 15:13:01 2019] 21747 start.
[Wed Oct 30 15:13:02 2019] 21745 end.
[Wed Oct 30 15:13:02 2019] 21745 start.
[Wed Oct 30 15:13:04 2019] 21746 end.
[Wed Oct 30 15:13:04 2019] 21746 start.
[Wed Oct 30 15:13:04 2019] 21745 end.
[Wed Oct 30 15:13:05 2019] 21746 end.
[Wed Oct 30 15:13:06 2019] 21747 end.
[Wed Oct 30 15:13:06 2019] 21744 main process end ... ...
Pool類還有兩個重要的函數,函數原型如下:
apply(func, args=(), kwds={})
apply_async(func, args=(), kwds={}, callback=None)
apply()使用args參數極易kwds命名參數調用func,它會在傳回結果前阻塞主程序。這使得它和單程序沒什麼差別。python3.x之後很少用。
apply_async()和apply()用法一樣,但它是非阻塞的,并且支援結果傳回進行回調。
将
mpsleepF.py
改寫為
mpsleepG.py
,用來比較apply()和apply_async()的用法,代碼如下:
# mpsleepG.py
from multiprocessing import Pool
from time import sleep, ctime
import os
loops = [4, 2, 3, 4, 5, 2, 1]
def loop(nsec):
print("[%s] %s start." % (ctime(), os.getpid()))
sleep(nsec)
print("[%s] %s end." % (ctime(), os.getpid()))
def main1():
print("[%s] %s main process_1 start ... ..." % (ctime(), os.getpid()))
p = Pool(processes=3)
for i in range(len(loops)):
p.apply(loop, (loops[i], ))
p.close()
p.join()
print("[%s] %s main process_1 end ... ..." % (ctime(), os.getpid()))
def main2():
print("[%s] %s main process_2 start ... ..." % (ctime(), os.getpid()))
p = Pool(processes=3)
result_list = []
for i in range(len(loops)):
result_list.append(p.apply_async(loop, (loops[i], )))
p.close()
p.join()
print("[%s] %s main process_2 end ... ..." % (ctime(), os.getpid()))
if __name__ == '__main__':
main1()
main2()
執行結果如下:
[Wed Oct 30 15:26:28 2019] 22799 main process_1 start ... ...
[Wed Oct 30 15:26:28 2019] 22800 start.
[Wed Oct 30 15:26:32 2019] 22800 end.
[Wed Oct 30 15:26:32 2019] 22801 start.
[Wed Oct 30 15:26:34 2019] 22801 end.
[Wed Oct 30 15:26:34 2019] 22802 start.
[Wed Oct 30 15:26:37 2019] 22802 end.
[Wed Oct 30 15:26:37 2019] 22800 start.
[Wed Oct 30 15:26:41 2019] 22800 end.
[Wed Oct 30 15:26:41 2019] 22801 start.
[Wed Oct 30 15:26:46 2019] 22801 end.
[Wed Oct 30 15:26:46 2019] 22802 start.
[Wed Oct 30 15:26:48 2019] 22802 end.
[Wed Oct 30 15:26:48 2019] 22800 start.
[Wed Oct 30 15:26:49 2019] 22800 end.
[Wed Oct 30 15:26:49 2019] 22799 main process_1 end ... ...
[Wed Oct 30 15:26:49 2019] 22799 main process_2 start ... ...
[Wed Oct 30 15:26:49 2019] 22808 start.
[Wed Oct 30 15:26:49 2019] 22809 start.
[Wed Oct 30 15:26:49 2019] 22810 start.
[Wed Oct 30 15:26:51 2019] 22809 end.
[Wed Oct 30 15:26:51 2019] 22809 start.
[Wed Oct 30 15:26:52 2019] 22810 end.
[Wed Oct 30 15:26:52 2019] 22810 start.
[Wed Oct 30 15:26:53 2019] 22808 end.
[Wed Oct 30 15:26:53 2019] 22808 start.
[Wed Oct 30 15:26:55 2019] 22809 end.
[Wed Oct 30 15:26:55 2019] 22809 start.
[Wed Oct 30 15:26:55 2019] 22808 end.
[Wed Oct 30 15:26:56 2019] 22809 end.
[Wed Oct 30 15:26:57 2019] 22810 end.
[Wed Oct 30 15:26:57 2019] 22799 main process_2 end ... ...
main2()中的result_list清單用來存儲傳回值(當然這個例子中的func并沒有傳回值)。
multiprocessing子產品中鎖的用法和threading子產品完全相同,不再重複講解。
程序間通信
pass