天天看點

Python中的多程序與程序池

Python中的多程序與程序池

文章目錄

  • ​​Python中的多程序與程序池​​
  • ​​1.multiprocessing類​​
  • ​​2.multiprocessing.Pool程序池​​
  • ​​構造方法​​
  • ​​常用方法​​
  • ​​多程序,多線程的選擇​​
  • ​​應用​​
  • ​​Linux的特殊程序​​
  • ​​僵屍程序​​
  • ​​孤兒程序​​
  • ​​守護程序​​

1.multiprocessing類

方法 含義
current_process()

擷取目前程序對象

類似于threading.current_thread()

active_children() 擷取目前程序所有活動的子程序清單。注意:不包含自己,即目前程序。有點類似于threading.enumerate()
cpu_count() 傳回目前系統CPU的數量
import multiprocessing

if __name__ == "__main__":
    p = multiprocessing.Pool(4)
    print(multiprocessing.current_process())
    print(multiprocessing.active_children())
    print(multiprocessing.cpu_count())      
Python中的多程式與程式池

2.multiprocessing.Pool程序池

構造方法

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) #構造方法

  • processes #程序池最大使用的工作程序數量,預設值為None,會使用os.cpu_count()來擷取cpu的數量作為最大程序數量。
  • initializer #程序初始化函數,程序初始化時會調用該函數。
  • initargs #參數,為initializer函數調用時傳遞的參數。
  • maxtasksperchild #工作程序退出前可以完成的任務數,完成後用一個新的工作程序來替代原程序,讓閑置的資源釋放,預設是None,此意味隻要Pool存在工作程序就一直存活(如果改為maxtasksperchild=1每次線程執行worker時都會重新建立一個新的程序并執行show)
  • context #用在制定工作程序啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來建立一個池,兩種方法都适當的設定了context。(context中有一個程序或者工作任務的隊列,會産生阻塞,保證程序會永久存在。)
  • 注意Pool建立後,會立即執行個體化指定個數的程序。以備使用
import multiprocessing
import logging
import sys
import time

logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)

def show(i):
    log.info("我啟動了{}".format(i))

def worker(k):
    time.sleep(2)
    log.info("正在列印-{}".format(k))

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=2,initializer=show,initargs=(8,),maxtasksperchild=None)
    # # 如果改為maxtasksperchild=1每次線程執行worker時都會重新建立一個新的程序并執行show
    #pool = multiprocessing.Pool(processes=2, initializer=show, initargs=(8,), maxtasksperchild=1)
    time.sleep(3)
    for i in range(4):
        pool.apply(worker,args=(i+5,))      

常用方法

名稱 說明
apply(self,func,args=(),kwds={}])

柱塞執行,導緻主程序執行其他程序就像一個個執行

func #要執行的函數

args #函數的位置參數

kwds #函數的關鍵字參數

apply_async(self,func,args=(),kwds={},callback=None,error_callback=None) 與apply方法用法一緻,非阻塞異步執行,得到結果會執行回調函數callback,如果出現異常會執行異常回調函數error_callback。會傳回一個将來的傳回對象。
close(self) 關閉池,池不能再接受新的任務,所有任務完成後退出程序
terminate(self) 立即結束工作程序,不再處理未處理的任務
join(self)

主程序阻塞等待子程序的退出,join方法要在close或terminate後使用。

否則由于程序池的程序預設不會消亡,會永久阻塞

  • apply(self,func,args=(),kwds={}])->return #同步調用,會阻塞,一個個執行。相當于串行
  1. func #要執行的函數
  2. args #函數的位置參數
  3. kwds #函數的關鍵字參數
  4. return #傳回函數執行的放回結果
import multiprocessing
import logging
import sys
import time

logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)

def worker(k):
    time.sleep(2)
    log.info("正在列印-{}".format(k))
    return k+5

if __name__ == "__main__":
    p = multiprocessing.Pool(4)
    for i in range(5):
        req = p.apply(func=worker,args=(i,))
        log.info("{}~~~~~~~~".format(req))
    print("-----------")      
  • apply_async(self,func,args=(),kwds={},callback=None,error_callback=None)->multiprocessing.pool.ApplyResult #同步執行,傳回結果是一個未來的結果對象
  1. func #要執行的函數
  2. args #函數的位置參數
  3. kwds #函數的關鍵字參數
  4. callback #回調函數,當函數執行完成後,回調執行的函數,會在主程序中建立一個線程來執行。
  5. error_callback #異常回調函數,如果發生異常後,會在主程序中建立一個線程執行。(一參的回調函數)
  6. multiprocessing.pool.ApplyResult 函數執行傳回的結果對象。在未來的函數執行完成後,會對這個結果對象指派。(一參的回調函數)
  • class multiprocessing.pool.AsyncResult類
  1. 由Pool.apply_async()和Pool.map_async()傳回的結果的類。
方法 函數
get(self,itemout=None)

擷取異步請求函數執行的傳回結果。如果未執行完成,會産生一個阻塞。直到傳回結果為止。

timeout #設定逾時時長,如果指定時間内沒有傳回會抛出一個multiprocessing.context.TimeoutError異常

wait(self,timeout=None)

等待結果,會阻塞,直到結果傳回。無論是否等到,都傳回None。

timeout逾時時長,預設為None表示無限等待。

ready(self) 傳回的調用是否已經完成,完成傳回True,否則傳回False
successful(self) 傳回的調用是否在沒有引發異常的情況下完成。如果結果還未計算完成。會抛出AssertionError異常
import multiprocessing
import logging
import sys
import time

logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)

def worker(k):
    time.sleep(2)
    log.info("正在列印-{}".format(k))
    return k+5

def callback(value):
    log.info("{}\t type={}".format(value,type(value)))

if __name__ == "__main__":
    p = multiprocessing.Pool(4)
    reqs = []
    for i in range(4):
        req = p.apply_async(func=worker,args=(i,),callback=callback,error_callback=callback)
        log.info("{}~~~~~~~~{}".format(req,type(req)))
        reqs.append(req)
    print("-----end------")
    while True:
        for i in reqs:
            if not i.ready():
                break
        else:
            for i in reqs:
                log.info(i.get())
            break
        time.sleep(1)      

多程序,多線程的選擇

  1. CPU密集型
  • CPython中使用到了GIL,多線程的時候鎖互相競争,且多核優勢不能發揮,選用Python多程序效率更高。
  1. IO密集型
  • 在Python中适合是用多線程,可以減少多程序間IO的序列化開銷。且在IO等待的時候,切換到其他線程繼續執 行,效率不錯。

應用

  • 請求/應答模型:WEB應用中常見的處理模型
  • master啟動多個worker工作程序,一般和CPU數目相同。發揮多核優勢。
  • worker工作程序中,往往需要操作網絡IO和磁盤IO,啟動多線程,提高并發處理能力。worker處理使用者的請求, 往往需要等待資料,處理完請求還要通過網絡IO傳回響應。這就是nginx工作模式。

Linux的特殊程序

在Linux/Unix中,通過父程序建立子程序。

僵屍程序

一個程序使用了fork建立了子程序,如果子程序終止進入僵死狀态,而父程序并沒有調用wait或者waitpid擷取子進 程的狀态資訊,那麼子程序仍留下一個資料結構儲存在系統中,這種程序稱為僵屍程序。

僵屍程序會占用一定的記憶體空間,還占用了程序号,是以一定要避免大量的僵屍程序産生。有很多方法可以避免僵 屍程序。

孤兒程序

守護程序

繼續閱讀