Python中的多程序與程序池
文章目錄
- Python中的多程序與程序池
- 1.multiprocessing類
- 2.multiprocessing.Pool程序池
- 構造方法
- 常用方法
- 多程序,多線程的選擇
- 應用
- Linux的特殊程序
- 僵屍程序
- 孤兒程序
- 守護程序
1.multiprocessing類
方法 | 含義 |
current_process() | 擷取目前程序對象 類似于threading.current_thread() |
active_children() | 擷取目前程序所有活動的子程序清單。注意:不包含自己,即目前程序。有點類似于threading.enumerate() |
cpu_count() | 傳回目前系統CPU的數量 |
import multiprocessing
if __name__ == "__main__":
p = multiprocessing.Pool(4)
print(multiprocessing.current_process())
print(multiprocessing.active_children())
print(multiprocessing.cpu_count())
2.multiprocessing.Pool程序池
構造方法
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) #構造方法
- processes #程序池最大使用的工作程序數量,預設值為None,會使用os.cpu_count()來擷取cpu的數量作為最大程序數量。
- initializer #程序初始化函數,程序初始化時會調用該函數。
- initargs #參數,為initializer函數調用時傳遞的參數。
- maxtasksperchild #工作程序退出前可以完成的任務數,完成後用一個新的工作程序來替代原程序,讓閑置的資源釋放,預設是None,此意味隻要Pool存在工作程序就一直存活(如果改為maxtasksperchild=1每次線程執行worker時都會重新建立一個新的程序并執行show)
- context #用在制定工作程序啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來建立一個池,兩種方法都适當的設定了context。(context中有一個程序或者工作任務的隊列,會産生阻塞,保證程序會永久存在。)
- 注意Pool建立後,會立即執行個體化指定個數的程序。以備使用
import multiprocessing
import logging
import sys
import time
logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)
def show(i):
log.info("我啟動了{}".format(i))
def worker(k):
time.sleep(2)
log.info("正在列印-{}".format(k))
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2,initializer=show,initargs=(8,),maxtasksperchild=None)
# # 如果改為maxtasksperchild=1每次線程執行worker時都會重新建立一個新的程序并執行show
#pool = multiprocessing.Pool(processes=2, initializer=show, initargs=(8,), maxtasksperchild=1)
time.sleep(3)
for i in range(4):
pool.apply(worker,args=(i+5,))
常用方法
名稱 | 說明 |
apply(self,func,args=(),kwds={}]) | 柱塞執行,導緻主程序執行其他程序就像一個個執行 func #要執行的函數 args #函數的位置參數 kwds #函數的關鍵字參數 |
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None) | 與apply方法用法一緻,非阻塞異步執行,得到結果會執行回調函數callback,如果出現異常會執行異常回調函數error_callback。會傳回一個将來的傳回對象。 |
close(self) | 關閉池,池不能再接受新的任務,所有任務完成後退出程序 |
terminate(self) | 立即結束工作程序,不再處理未處理的任務 |
join(self) | 主程序阻塞等待子程序的退出,join方法要在close或terminate後使用。 否則由于程序池的程序預設不會消亡,會永久阻塞 |
- apply(self,func,args=(),kwds={}])->return #同步調用,會阻塞,一個個執行。相當于串行
- func #要執行的函數
- args #函數的位置參數
- kwds #函數的關鍵字參數
- return #傳回函數執行的放回結果
import multiprocessing
import logging
import sys
import time
logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)
def worker(k):
time.sleep(2)
log.info("正在列印-{}".format(k))
return k+5
if __name__ == "__main__":
p = multiprocessing.Pool(4)
for i in range(5):
req = p.apply(func=worker,args=(i,))
log.info("{}~~~~~~~~".format(req))
print("-----------")
- apply_async(self,func,args=(),kwds={},callback=None,error_callback=None)->multiprocessing.pool.ApplyResult #同步執行,傳回結果是一個未來的結果對象
- func #要執行的函數
- args #函數的位置參數
- kwds #函數的關鍵字參數
- callback #回調函數,當函數執行完成後,回調執行的函數,會在主程序中建立一個線程來執行。
- error_callback #異常回調函數,如果發生異常後,會在主程序中建立一個線程執行。(一參的回調函數)
- multiprocessing.pool.ApplyResult 函數執行傳回的結果對象。在未來的函數執行完成後,會對這個結果對象指派。(一參的回調函數)
- class multiprocessing.pool.AsyncResult類
- 由Pool.apply_async()和Pool.map_async()傳回的結果的類。
方法 | 函數 |
get(self,itemout=None) | 擷取異步請求函數執行的傳回結果。如果未執行完成,會産生一個阻塞。直到傳回結果為止。 timeout #設定逾時時長,如果指定時間内沒有傳回會抛出一個multiprocessing.context.TimeoutError異常 |
wait(self,timeout=None) | 等待結果,會阻塞,直到結果傳回。無論是否等到,都傳回None。 timeout逾時時長,預設為None表示無限等待。 |
ready(self) | 傳回的調用是否已經完成,完成傳回True,否則傳回False |
successful(self) | 傳回的調用是否在沒有引發異常的情況下完成。如果結果還未計算完成。會抛出AssertionError異常 |
import multiprocessing
import logging
import sys
import time
logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)
def worker(k):
time.sleep(2)
log.info("正在列印-{}".format(k))
return k+5
def callback(value):
log.info("{}\t type={}".format(value,type(value)))
if __name__ == "__main__":
p = multiprocessing.Pool(4)
reqs = []
for i in range(4):
req = p.apply_async(func=worker,args=(i,),callback=callback,error_callback=callback)
log.info("{}~~~~~~~~{}".format(req,type(req)))
reqs.append(req)
print("-----end------")
while True:
for i in reqs:
if not i.ready():
break
else:
for i in reqs:
log.info(i.get())
break
time.sleep(1)
多程序,多線程的選擇
- CPU密集型
- CPython中使用到了GIL,多線程的時候鎖互相競争,且多核優勢不能發揮,選用Python多程序效率更高。
- IO密集型
- 在Python中适合是用多線程,可以減少多程序間IO的序列化開銷。且在IO等待的時候,切換到其他線程繼續執 行,效率不錯。
應用
- 請求/應答模型:WEB應用中常見的處理模型
- master啟動多個worker工作程序,一般和CPU數目相同。發揮多核優勢。
- worker工作程序中,往往需要操作網絡IO和磁盤IO,啟動多線程,提高并發處理能力。worker處理使用者的請求, 往往需要等待資料,處理完請求還要通過網絡IO傳回響應。這就是nginx工作模式。
Linux的特殊程序
在Linux/Unix中,通過父程序建立子程序。
僵屍程序
一個程序使用了fork建立了子程序,如果子程序終止進入僵死狀态,而父程序并沒有調用wait或者waitpid擷取子進 程的狀态資訊,那麼子程序仍留下一個資料結構儲存在系統中,這種程序稱為僵屍程序。
僵屍程序會占用一定的記憶體空間,還占用了程序号,是以一定要避免大量的僵屍程序産生。有很多方法可以避免僵 屍程序。