本文執行個體總結了Python多程序并發與多線程并發。分享給大家供大家參考,具體如下:
這裡對python支援的幾種并發方式進行簡單的總結。
Python支援的并發分為多線程并發與多程序并發(異步IO本文不涉及)。概念上來說,多程序并發即運作多個獨立的程式,優勢在于并發處理的任務都由作業系統管理,不足之處在于程式與各程序之間的通信和資料共享不友善;多線程并發則由程式員管理并發處理的任務,這種并發方式可以友善地線上程間共享資料(前提是不能互斥)。Python對多線程和多程序的支援都比一般程式設計語言更進階,最小化了需要我們完成的工作。
一.多程序并發
Mark Summerfield指出,對于計算密集型程式,多程序并發優于多線程并發。計算密集型程式指的程式的運作時間大部分消耗在CPU的運算處理過程,而硬碟和記憶體的讀寫消耗的時間很短;相對地,IO密集型程式指的則是程式的運作時間大部分消耗在硬碟和記憶體的讀寫上,CPU的運算時間很短。
對于多程序并發,python支援兩種實作方式,一種是采用程序安全的資料結構:multiprocessing.JoinableQueue,這種資料結構自己管理“加鎖”的過程,程式員無需擔心“死鎖”的問題;python還提供了一種更為優雅而進階的實作方式:采用程序池。下面一一介紹。
1.隊列實作——使用multiprocessing.JoinableQueue
multiprocessing是python标準庫中支援多程序并發的子產品,我們這裡采用multiprocessing中的資料結構:JoinableQueue,它本質上仍是一個FIFO的隊列,它與一般隊列(如queue中的Queue)的差別在于它是多程序安全的,這意味着我們不用擔心它的互斥和死鎖問題。JoinableQueue主要可以用來存放執行的任務和收集任務的執行結果。舉例來看(以下皆省去導入包的過程):
def read(q):
while True:
try:
value = q.get()
print('Get %s from queue.' % value)
time.sleep(random.random())
finally:
q.task_done()
def main():
q = multiprocessing.JoinableQueue()
pw1 = multiprocessing.Process(target=read, args=(q,))
pw2 = multiprocessing.Process(target=read, args=(q,))
pw1.daemon = True
pw2.daemon = True
pw1.start()
pw2.start()
for c in [chr(ord('A')+i) for i in range(26)]:
q.put(c)
try:
q.join()
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
對于windows系統的多程序并發,程式檔案裡必須含有“入口函數”(如main函數),且結尾處必須調用入口點。例如以if __name__ == '__main__': main()結尾。
在這個最簡單的多程序并發例子裡,我們用多程序實作将26個字母列印出來。首先定義一個存放任務的JoinableQueue對象,然後執行個體化兩個Process對象(每個對象對應一個子程序),執行個體化Process對象需要傳送target和args參數,target是實作每個任務工作中的具體函數,args是target函數的參數。
pw1.daemon = True
pw2.daemon = True
這兩句話将子程序設定為守護程序——主程序結束後随之結束。
pw1.start()
pw2.start()
一旦運作到這兩句話,子程序就開始獨立于父程序運作了,它會在單獨的程序裡調用target引用的函數——在這裡即read函數,它是一個死循環,将參數q中的數一一讀取并列印出來。
value = q.get()
這是多程序并發的要點,q是一個JoinableQueue對象,支援get方法讀取第一個元素,如果q中沒有元素,程序就會阻塞,直至q中被存入新元素。
是以執行完pw1.start() pw2.start()這兩句話後,子程序雖然開始運作了,但很快就堵塞住。
for c in [chr(ord('A')+i) for i in range(26)]:
q.put(c)
将26個字母依次放入JoinableQueue對象中,這時候兩個子程序不再阻塞,開始真正地執行任務。兩個子程序都用value = q.get()來讀取資料,它們都在修改q對象,而我們并不用擔心同步問題,這就是multiProcessing.Joinable資料結構的優勢所在——它是多程序安全的,它會自動處理“加鎖”的過程。
try:
q.join()
q.join()方法會查詢q中的資料是否已讀完——這裡指的就是任務是否執行完,如果沒有,程式會阻塞住等待q中資料讀完才開始繼續執行(可以用Ctrl+C強制停止)。
對Windows系統,調用任務管理器應該可以看到有多個子程序在運作。
2.程序池實作——使用concurrent.futures.ProcessPoolExecutor
Python還支援一種更為優雅的多程序并發方式,直接看例子:
def read(q):
print('Get %s from queue.' % q)
time.sleep(random.random())
def main():
futures = set()
with concurrent.futures.ProcessPoolExecutor() as executor:
for q in (chr(ord('A')+i) for i in range(26)):
future = executor.submit(read, q)
futures.add(future)
try:
for future in concurrent.futures.as_completed(futures):
err = future.exception()
if err is not None:
raise err
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
這裡我們采用concurrent.futures.ProcessPoolExecutor對象,可以把它想象成一個程序池,子程序往裡“填”。我們通過submit方法執行個體一個Future對象,然後把這裡Future對象都填到池——futures裡,這裡futures是一個set對象。隻要程序池裡有future,就會開始執行任務。這裡的read函數更為簡單——隻是把一個字元列印并休眠一會而已。
try:
for future in concurrent.futures.as_completed(futures):
這是等待所有子程序都執行完畢。子程序執行過程中可能抛出異常,err = future.exception()可以收集這些異常,便于後期處理。
可以看出用Future對象處理多程序并發更為簡潔,無論是target函數的編寫、子程序的啟動等等,future對象還可以向使用者彙報其狀态,也可以彙報執行結果或執行時的異常。
二.多線程并發
對于IO密集型程式,多線程并發可能要優于多程序并發。因為對于網絡通信等IO密集型任務來說,決定程式效率的主要是網絡延遲,這時候是使用程序還是線程就沒有太大關系了。
1.隊列實作——使用queue.Queue
程式與多程序基本一緻,隻是這裡我們不必使用multiProcessing.JoinableQueue對象了,一般的隊列(來自queue.Queue)就可以滿足要求:
def read(q):
while True:
try:
value = q.get()
print('Get %s from queue.' % value)
time.sleep(random.random())
finally:
q.task_done()
def main():
q = queue.Queue()
pw1 = threading.Thread(target=read, args=(q,))
pw2 = threading.Thread(target=read, args=(q,))
pw1.daemon = True
pw2.daemon = True
pw1.start()
pw2.start()
for c in [chr(ord('A')+i) for i in range(26)]:
q.put(c)
try:
q.join()
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
并且這裡我們執行個體化的是Thread對象,而不是Process對象,程式的其餘部分看起來與多程序并沒有什麼兩樣。
2. 線程池實作——使用concurrent.futures.ThreadPoolExecutor
直接看例子:
def read(q):
print('Get %s from queue.' % q)
time.sleep(random.random())
def main():
futures = set()
with concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count()*4) as executor:
for q in (chr(ord('A')+i) for i in range(26)):
future = executor.submit(read, q)
futures.add(future)
try:
for future in concurrent.futures.as_completed(futures):
err = future.exception()
if err is not None:
raise err
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
用ThreadPoolExecutor與用ProcessPoolExecutor看起來沒什麼差別,隻是改了一下簽名而已。
不難看出,不管是使用隊列還是使用進/線程池,從多程序轉化到多線程是十分容易的——僅僅是修改了幾個簽名而已。當然内部機制完全不同,隻是python的封裝非常好,使我們可以不用關心這些細節,這正是python優雅之處。
更多關于Python相關内容感興趣的讀者可檢視本站專題:《Python程序與線程操作技巧總結》、《Python Socket程式設計技巧總結》、《Python資料結構與算法教程》、《Python函數使用技巧總結》、《Python字元串操作技巧彙總》、《Python入門與進階經典教程》及《Python檔案與目錄操作技巧彙總》
希望本文所述對大家Python程式設計有所幫助。
本文标題: Python多程序并發與多線程并發程式設計執行個體總結
本文位址: http://www.cppcns.com/jiaoben/python/220136.html