天天看點

Python multiprocessing 使用手記[3] – 關于Queue

繼續讨論Python multiprocessing,這次讨論的主要内容是mp庫的核心元件之一的Queue。

Queue是mp庫當中用來提供多程序對象交換的方式。對象交換和上一部分當中提到的對象共享都是使多個程序通路同一個對象的方式,兩者的差別就是,對象共享是多個程序通路同一個對象,對象交換則是将對象從一個程序傳輸的另一個程序。

multiprocessing當中的Queue使用方式和Python内置的threading.Queue對象很像,它支援一個put操作,将對象放入Queue,也支援一個get操作,将對象從Queue當中讀出。和threading.Queue不同的是,mp.Queue預設不支援join()和task_done操作,這兩個支援需要使用mp.JoinableQueue對象。

由于Queue對象負責程序之間的對象傳輸,是以第一個問題就是如何在兩個程序之間共享這個Queue對象本身。在上一部分所言的三種共享方式當中,Queue對象隻能使用繼承(inheritance)的方式共享。這是因為Queue本身基于unix的Pipe對象實作,而Pipe對象的共享需要通過繼承。是以,在一個典型的應用實作模型當中,應該是父程序建立Queue,然後建立子程序共享該Queue,由父程序和子程序分别讀寫。例如下面的這個例子

import multiprocessing
 
q = multiprocessing.Queue()
 
def reader_proc():
    print q.get()
 
reader = multiprocessing.Process(target=reader_proc)
reader.start()
 
q.put(100)
reader.join()
           

另一種實作方式是父程序建立Queue,建立多個子程序,有的子程序讀Queue,有的子程序寫Queue,例如:

import multiprocessing
 
q = multiprocessing.Queue()
 
def writer_proc():
    q.put(100)
 
def reader_proc():
    print q.get()
 
reader = multiprocessing.Process(target=reader_proc)
reader.start()
writer = multiprocessing.Process(target=writer_proc)
writer.start()
 
reader.join()
writer.join()
           

由于使用繼承的方式共享Queue,是以代碼當中并沒有明顯的傳輸Queue對象本身的代碼,看起來似乎隻要将multiprocessing當中的對象換成threading當中的對象,程式仍然能夠工作。反之,拿到一個現有的多線程程式,是不是将threading改成multiprocessing就可以工作呢?也許可以,但是更可能的情況是你會遇到很多問題。

第一個問題就是mp的Queue需要考慮多程序之間的對象傳輸,是以所傳輸的對象必須是可以pickle的。否則,在Queue的put操作上會抛出PicklingError。

其他的一些差異表現在一些技術細節上,這些不是任何高層邏輯可以抽象掉的,不知道這些差異會導緻一些潛在的錯誤,例如死鎖。在總結這些潛在的犯錯的可能的同時,我們會簡單看一下mp當中Queue的實作方式,以便能夠友善的了解為什麼會有這樣的行為。這些實作問題僅僅針對Linux,Windows上面的實作和出現的問題在這裡不涉及。

mp.Queue建構在系統的Pipe之上,但是實際上程序并不是直接将對象寫入到Pipe裡面,而是先寫入一個本地的buffer,再由一個專門的feed線程将其放入Pipe當中。讀取端則是直接從Pipe當中讀出對象。之是以有這樣一個feed線程,是為了能夠提供Queue接口函數所需要的put的逾時控制。但是由于這個feed線程的存在,mp.Queue提供了幾個額外的函數來控制它,一個函數close來停止該線程,以及join_thread來join該線程。close同時負責把所有在buffer當中的對象重新整理到Pipe當中。

但是這個feed線程也是個麻煩制造者,為了保證所有被放入Queue的東西最終都能夠到達另外一端的程序,mp庫注冊了一個atexit的處理函數,用來在程序退出的時候自動close并且join該feed線程。這個join動作帶來了很多問題,比如潛在的死鎖。考慮下面一種狀況:一個父程序建立了兩個子程序,一個子程序讀,另一個子程序寫。當需要停止這些程序的時候,父程序如果先把讀程序結束,但是同時寫程序已經将太多的對象寫入Queue,導緻後繼的對象等待在buffer當中,則這個程序将無法終止,因為atexit的處理函數等待把所有buffer當中的對象放入Pipe,但是Pipe已經滿了,然後陷入了死鎖。

有人可能會問,那隻要保證總是按照資料流的順序來停止程序不就行。問題是在很多複雜的系統流程當中,可能存在一個環形的資料流,這種情況下,無論按照什麼順序停止程序,終究有一個程序可能陷入這種情景當中。

幸運的是,Queue對象還提供了一個成員函數cancel_join_thread,這個函數可以使得在程序停止的時候不進行join操作,這樣可以避免死鎖,代價就是這個時候尚未重新整理到Pipe當中的對象都會丢失。鑒于即使調用了join_thread,殘留在Pipe當中的對象仍然可能丢失,是以一旦選擇使用mp的Queue對象,就不要假設不會在流程當中丢對象了。

另外一個可能的方案是使用mp庫當中的SimpleQueue對象。這個對象在文檔當中沒有提及,但是在multiprocessing.queue子產品當中有定義。這個對象就是去掉了buffer的Queue對象,是以可能能夠避免上面說的問題的。但是SimpleQueue沒有提供put和get的逾時處理,兩個動作都是阻塞的。

除了使用multiprocessing.Queue,還可以使用multiprocessing.Pipe進行通信。mp.Pipe是Queue的底層結構,但是沒有feed線程和put/get的逾時控制。一定程度上和SimpleQueue很像。需要注意的是Pipe帶有一個參數 duplex,當設定為True(預設)的時候,Pipe并不是使用系統的pipe來實作,而是通過socketpair,即Unix Domain Socket來實作。這個和pipe相比有些微的性能差異。

另外一個使用Queue的方式不是mp庫内置的。這種方式使用上一篇文章當中提到的server process的方式來共享一個Queue對象。這個Queue對象實際上在server process當中,所有的子程序通過socket連接配接到server process擷取該Queue的代理對象進行操作。說到這有人會想起來mp庫有一個内置的SyncManager對象,可以通過multiprocess.Manager函數擷取到,通過該對象的Queue方法可以擷取一個Queue的代理對象。不幸的是,這個方法不是正确的擷取Queue的方式,原因正如上一篇文章所說,SyncManager.Queue方法的每次調用擷取到的是一個建立對象的代理對象,而不是一個共享對象。正确的使用server process當中的Queue的方式是:

共同部分:

import multiprocessing.managers as mpm
import Queue
 
class SharedQueueManager(mpm.BaseManager): pass
q = Queue.Queue()
SharedQueueManager.register('Queue', lambda: q)
           

服務程序:

mgr = SharedQueueManager(address=('', 12345))
server = mgr.get_server()
server.serve_forever()
           

客戶程序:

mgr = SharedQueueManager(address=('localhost', 12345))
mgr.connect()
q = mgr.Queue() # 這裡q就是共享的Queue對象的代理對象
           

這種方式比起mp庫内置的Queue,有一些性能上的影響,因為畢竟牽涉到多次網絡通訊,但是帶來的好處是沒有feed線程帶來的一系列問題,而且理論上不會存在丢資料的問題,除非server process崩潰。但是正如上一篇所說,server process本身就不是很靠譜的,是以這裡也隻是“理論上”不會丢資料而已。

說到性能,這裡就列兩個性能資料,以前在twitter上面提到過的(這兩個連接配接無法通路的請聯系我):

操作對象為 pickle後512位元組的對象,通過proxy操作Queue的性能大約是7000次/秒(本機)或1100次/秒(多機),如果使用 multiprocessing.Queue,效率可達54000次/秒。