天天看點

二 python并發程式設計之多程序-應用

  • ​​一 multiprocessing子產品介紹​​
  • ​​二 Process類的介紹​​
  • ​​三 Process類的使用​​
  • ​​四 守護程序​​
  • ​​五 程序同步(鎖)​​
  • ​​六 隊列(推薦使用)​​
  • ​​七 管道​​
  • ​​八 共享資料​​
  • ​​九 信号量(了解)​​
  • ​​十 事件(了解)​​
  • ​​十一 程序池​​

一 multiprocessing子產品介紹

     python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()檢視),在python中大部分情況需要使用多程序。Python提供了multiprocessing。

     multiprocessing子產品用來開啟子程序,并在子程序中執行我們定制的任務(比如函數),該子產品與多線程子產品threading的程式設計接口類似。

  multiprocessing子產品的功能衆多:支援子程序、通信和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等元件。

     需要再次強調的一點是:與線程不同,程序沒有任何共享狀态,程序修改的資料,改動僅限于該程序内。

二 Process類的介紹

建立程序的類:

Process([group [, target [, name [, args [, kwargs]]]]]),由該類執行個體化得到的對象,表示一個子程序中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗号      

參數介紹:

group參數未使用,值始終為None

target表示調用對象,即子程序要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name為子程序的名稱      

方法介紹:

p.start():啟動程序,并調用該子程序中的p.run()
p.run():程序啟動時運作的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實作該方法

p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了僵屍程序,使用該方法需要特别小心這種情況。如果p還儲存了一個鎖那麼也将不會被釋放,進而導緻死鎖
p.is_alive():如果p仍然運作,傳回True

p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀态,而p是處于運作的狀态)。timeout是可選的逾時時間,需要強調的是,p.join隻能join住start開啟的程序,而不能join住run開啟的程序      

屬性介紹:

p.daemon:預設值為False,如果設為True,代表p為背景運作的守護程序,當p的父程序終止時,p也随之終止,并且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定

p.name:程序的名稱

p.pid:程序的pid

p.exitcode:程序在運作時為None、如果為–N,表示被信号N結束(了解即可)

p.authkey:程序的身份驗證鍵,預設是由os.urandom()随機生成的32字元的字元串。這個鍵的用途是為涉及網絡連接配接的底層程序間通信提供安全性,這類連接配接隻有在具有相同的身份驗證鍵時才能成功(了解即可)      

三 Process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

# 詳細解釋
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
由于Windows沒有fork,多處理子產品啟動一個新的Python程序并導入調用子產品。 
如果在導入時調用Process(),那麼這将啟動無限繼承的新程序(或直到機器耗盡資源)。 
這是隐藏對Process()内部調用的原,使用if __name__ == “__main __”,這個if語句中的語句将不會在導入時被調用。      

建立并開啟子程序的兩種方式

#開程序的方法一:
import time
import random
from multiprocessing import Process
def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)



p1=Process(target=piao,args=('egon',)) #必須加,号
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('wupeqi',))
p4=Process(target=piao,args=('yuanhao',))

p1.start()
p2.start()
p3.start()
p4.start()
print('主線程')      
#開程序的方法二:
import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

p1=Piao('egon')
p2=Piao('alex')
p3=Piao('wupeiqi')
p4=Piao('yuanhao')

p1.start() #start會自動調用run
p2.start()
p3.start()
p4.start()
print('主線程')      

程序直接的記憶體空間是隔離的

from multiprocessing import Process
n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就可以了
def work():
    global n
    n=0
    print('子程序内: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主程序内: ',n)      

練習1:把上周所學的socket通信變成并發的形式

server端

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start程序一定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()      

多個client端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))      

這麼實作有沒有問題???

每來一個用戶端,都在服務端開啟一個程序,如果并發來一個萬個用戶端,要開啟一萬個程序嗎,你自己嘗試着在你自己的機器上開啟一萬個,10萬個程序試一試。
解決方法:程序池      

Process對象的join方法

join:主程序等,等待子程序結束

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.start()
p.join(0.0001) #等待p停止,等0.0001秒就不再等了
print('開始')      

有了join,程式不就是串行了嗎???

from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

p1=Process(target=piao,args=('egon',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('yuanhao',))
p4=Process(target=piao,args=('wupeiqi',))

p1.start()
p2.start()
p3.start()
p4.start()

#有的同學會有疑問:既然join是等待程序結束,那麼我像下面這樣寫,程序不就又變成串行的了嗎?
#當然不是了,必須明确:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非程序p,

#詳細解析如下:
#程序隻要start就會在開始運作了,是以p1-p4.start()時,系統中已經有四個并發的程序了
#而我們p1.join()是在等p1結束,沒錯p1隻要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是并發執行的,p1.join的時候,其餘p2,p3,p4仍然在運作,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待
# 是以4個join花費的總時間仍然是耗費時間最長的那個程序運作的時間
p1.join()
p2.join()
p3.join()
p4.join()

print('主線程')


#上述啟動程序與join程序可以簡寫為
# p_l=[p1,p2,p3,p4]
# 
# for p in p_l:
#     p.start()
# 
# for p in p_l:
#     p.join()      

Process對象的其他方法或屬性(了解)

terminate與is_alive

#程序對象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#關閉程序,不會立即關閉,是以is_alive立刻檢視的結果可能還是存活
print(p1.is_alive()) #結果為True

print('開始')
print(p1.is_alive()) #結果為False      

name與pid

from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #是以加到這裡,會覆寫我們的self.name=name

        #為我們開啟的程序設定名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

p=Piao('egon')
p.start()
print('開始')
print(p.pid) #檢視pid      
二 python并發程式設計之多程式-應用

僵屍程序與孤兒程序(了解)

參考部落格:http://www.cnblogs.com/Anker/p/3271773.html

一:僵屍程序(有害)
  僵屍程序:一個程序使用fork建立子程序,如果子程序退出,而父程序并沒有調用wait或waitpid擷取子程序的狀态資訊,那麼子程序的程序描述符仍然儲存在系統中。這種程序稱之為僵死程序。詳解如下

我們知道在unix/linux中,正常情況下子程序是通過父程序建立的,子程序在建立新的程序。子程序的結束和父程序的運作是一個異步過程,即父程序永遠無法預測子程序到底什麼時候結束,如果子程序一結束就立刻回收其全部資源,那麼在父程序内将無法擷取子程序的狀态資訊。

是以,UNⅨ提供了一種機制可以保證父程序可以在任意時刻擷取子程序結束時的狀态資訊:
1、在每個程序退出的時候,核心釋放該程序所有的資源,包括打開的檔案,占用的記憶體等。但是仍然為其保留一定的資訊(包括程序号the process ID,退出狀态the termination status of the process,運作時間the amount of CPU time taken by the process等)
2、直到父程序通過wait / waitpid來取時才釋放. 但這樣就導緻了問題,如果程序不調用wait / waitpid的話,那麼保留的那段資訊就不會釋放,其程序号就會一直被占用,但是系統所能使用的程序号是有限的,如果大量的産生僵死程序,将因為沒有可用的程序号而導緻系統不能産生新的程序. 此即為僵屍程序的危害,應當避免。

  任何一個子程序(init除外)在exit()之後,并非馬上就消失掉,而是留下一個稱為僵屍程序(Zombie)的資料結構,等待父程序處理。這是每個子程序在結束時都要經過的階段。如果子程序在exit()之後,父程序沒有來得及處理,這時用ps指令就能看到子程序的狀态是“Z”。如果父程序能及時 處理,可能用ps指令就來不及看到子程序的僵屍狀态,但這并不等于子程序不經過僵屍狀态。  如果父程序在子程序結束之前退出,則子程序将由init接管。init将會以父程序的身份對僵屍狀态的子程序進行處理。

二:孤兒程序(無害)

  孤兒程序:一個父程序退出,而它的一個或多個子程序還在運作,那麼那些子程序将成為孤兒程序。孤兒程序将被init程序(程序号為1)所收養,并由init程序對它們完成狀态收集工作。

  孤兒程序是沒有父程序的程序,孤兒程序這個重任就落到了init程序身上,init程序就好像是一個民政局,專門負責處理孤兒程序的善後工作。每當出現一個孤兒程序的時候,核心就把孤 兒程序的父程序設定為init,而init程序會循環地wait()它的已經退出的子程序。這樣,當一個孤兒程序凄涼地結束了其生命周期的時候,init程序就會代表黨和政府出面處理它的一切善後工作。是以孤兒程序并不會有什麼危害。

我們來測試一下(建立完子程序後,主程序所在的這個腳本就退出了,當父程序先于子程序結束時,子程序會被init收養,成為孤兒程序,而非僵屍程序),檔案内容

import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
#執行pid=os.fork()則會生成一個子程序
#傳回值pid有兩種值:
#    如果傳回的pid值為0,表示在子程序當中
#    如果傳回的pid值>0,表示在父程序當中
if pid > 0:
    print 'father died..'
    sys.exit(0)

# 保證主線程退出完畢
time.sleep(1)
print 'im child', os.getpid(), os.getppid()

執行檔案,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子程序已經被pid為1的init程序接收了,是以僵屍程序在這種情況下是不存在的,存在隻有孤兒程序而已,孤兒程序聲明周期結束自然會被init來銷毀。


三:僵屍程序危害場景:

  例如有個程序,它定期的産 生一個子程序,這個子程序需要做的事情很少,做完它該做的事情之後就退出了,是以這個子程序的生命周期很短,但是,父程序隻管生成新的子程序,至于子程序 退出之後的事情,則一概不聞不問,這樣,系統運作上一段時間之後,系統中就會存在很多的僵死程序,倘若用ps指令檢視的話,就會看到很多狀态為Z的程序。 嚴格地來說,僵死程序并不是問題的根源,罪魁禍首是産生出大量僵死程序的那個父程序。是以,當我們尋求如何消滅系統中大量的僵死程序時,答案就是把産生大 量僵死程序的那個元兇槍斃掉(也就是通過kill發送SIGTERM或者SIGKILL信号啦)。槍斃了元兇程序之後,它産生的僵死程序就變成了孤兒進 程,這些孤兒程序會被init程序接管,init程序會wait()這些孤兒程序,釋放它們占用的系統程序表中的資源,這樣,這些已經僵死的孤兒程序 就能瞑目而去了。

四:測試
#1、産生僵屍程序的程式test.py内容如下

#coding:utf-8
from multiprocessing import Process
import time,os

def run():
    print('子',os.getpid())

if __name__ == '__main__':
    p=Process(target=run)
    p.start()
    
    print('主',os.getpid())
    time.sleep(1000)


#2、在unix或linux系統上執行
[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653

[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出現僵屍程序
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z

[root@vm172-31-0-19 ~]# top #執行top指令發現1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                        
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                      


#3、
等待父程序正常結束後會調用wait/waitpid去回收僵屍程序
但如果父程序是一個死循環,永遠不會結束,那麼該僵屍程序就會一直存在,僵屍程序過多,就是有害的
解決方法一:殺死父程序
解決方法二:對開啟的子程序應該記得使用join,join會回收僵屍程序
參考python2源碼注釋
class Process(object):
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

join方法中調用了wait,告訴系統釋放僵屍程序。discard為從自己的children中剔除

解決方法三:http://blog.csdn.net/u010571844/article/details/50419798      

思考:

from multiprocessing import Process
import time,os

def task():
    print('%s is running' %os.getpid())
    time.sleep(3)
    
if __name__ == '__main__':
    p=Process(target=task)
    p.start()
    p.join() # 等待程序p結束後,join函數内部會發送系統調用wait,去告訴作業系統回收掉程序p的id号

    print(p.pid) #???此時能否看到子程序p的id号
    print('主')

ps:即便我們不調用join方法,python解釋器也會定期在背景自動回收僵屍程序,如果我們開啟一個死循環,不停地開子程序,子程序很快結束,父程序不死,會産生一堆僵屍子程序,但這些僵屍程序并不會累積
因為python解釋器會自動将它們回收,但如果我們在迅速開啟了一堆子程序後,讓父程序time.sleep(100)阻塞在了原地,那麼父程序就是整體阻在原地了,此時子程序陸續進入僵屍狀态,并不會被回收      
#答案:可以
#分析:
p.join()是像作業系統發送請求,告知作業系統p的id号不需要再占用了,回收就可以,
此時在父程序内還可以看到p.pid,但此時的p.pid是一個無意義的id号,因為作業系統已經将該編号回收

打個比方:
我黨相當于作業系統,控制着整個中國的硬體,每個人相當于一個程序,每個人都需要跟我黨申請一個身份證号
該号碼就相當于程序的pid,人死後應該到我黨那裡登出身份證号,p.join()就相當于要求我黨回收身份證号,但p的家人(相當于主程序)
仍然持有p的身份證,但此刻的身份證已經沒有意義      

四 守護程序

主程序建立守護程序

  1. 其一:守護程序會在主程序代碼執行結束後就終止
  2. 其二:守護程序内無法再開啟子程序,否則抛出異常:AssertionError: daemonic processes are not allowed to have children

注意:程序之間是互相獨立的,主程序代碼運作結束,守護程序随即終止

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.daemon=True #一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,并且父程序代碼執行結束,p即終止運作
p.start()
print('主')      

迷惑人的例子

#主程序代碼運作完畢,守護程序就會結束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
print("main-------") #列印該行則主程序代碼結束,則守護程序p1應該被終止,可能會有p1任務執行的列印資訊123,因為主程序列印main----時,p1也執行了,但是随即被終止      

五 程序同步(鎖)

程序之間資料不共享,但是共享同一套檔案系統,是以通路同一個檔案,或同一個列印終端,是沒有問題的,

而共享帶來的是競争,競争帶來的結果就是錯亂,如何控制,就是加鎖處理

part1:多個程序共享同一列印終端

#并發運作,效率高,但競争同一列印終端,帶來了列印錯亂
from multiprocessing import Process
import os,time
def work():
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work)
        p.start()      
#由并發變成了串行,犧牲了運作效率,但避免了競争
from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()      

part2:多個程序共享同一檔案

#并發運作,效率高,但競争寫同一檔案,資料寫入錯亂
#檔案當資料庫,模拟搶票
#檔案db的内容為:{"count":1}
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db.txt'))
    time.sleep(0.1) #模拟讀資料的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模拟寫資料的網絡延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[43m購票成功\033[0m')

def task(lock):
    search()
    get()
if __name__ == '__main__':
    lock=Lock()
    for i in range(100): #模拟并發100個用戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()      
#加鎖:購票行為由并發變成了串行,犧牲了運作效率,但保證了資料安全
#檔案當資料庫,模拟搶票
#檔案db的内容為:{"count":1}
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db.txt'))
    time.sleep(0.1) #模拟讀資料的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模拟寫資料的網絡延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[43m購票成功\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(100): #模拟并發100個用戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()      

總結:

#加鎖可以保證多個程序修改同一塊資料時,同一時間隻能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實作程序間通信,但問題是:
1.效率低(共享資料基于檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理



#是以我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing子產品為我們提供的基于消息的IPC通信機制:隊列和管道。
1 隊列和管道都是将資料存放于記憶體中
2 隊列又是基于(管道+鎖)實作的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該盡量避免使用共享資料,盡可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。      

六 隊列(推薦使用)

程序彼此之間互相隔離,要實作程序間通信(IPC),multiprocessing子產品支援兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

建立隊列的類(底層就是以管道和鎖定的方式實作):

Queue([maxsize]):建立共享的程序隊列,Queue是多程序安全的隊列,可以使用Queue實作多程序之間的資料傳遞。      

參數介紹:

maxsize是隊列中允許最大項數,省略則無大小限制。      

 方法介紹:

 主要方法:

q.put方法用以插入資料到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(預設值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果逾時,會抛出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即抛出Queue.Full異常。
q.get方法可以從隊列讀取并且删除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(預設值),并且timeout為正值,那麼在等待時間内沒有取到任何元素,會抛出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即傳回該值,否則,如果隊列為空,則立即抛出Queue.Empty異常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():調用此方法時q為空則傳回True,該結果不可靠,比如在傳回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則傳回True,該結果不可靠,比如在傳回True的過程中,如果隊列中的項目被取走。
q.qsize():傳回隊列中目前項目的正确數量,結果也不可靠,理由同q.empty()和q.full()一樣      

其他方法(了解):

q.cancel_join_thread():不會在程序退出時自動連接配接背景線程。可以防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多資料。調用此方法,背景線程将繼續寫入那些已經入隊列但尚未寫入的資料,但将在此方法完成時馬上關閉。如果q被垃圾收集,将調用此方法。關閉隊列不會在隊列使用者中産生任何類型的資料結束信号或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生産者中的隊列不會導緻get()方法傳回錯誤。
q.join_thread():連接配接隊列的背景線程。此方法用于在調用q.close()方法之後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序調用。調用q.cancel_join_thread方法可以禁止這種行為      

應用:

'''
multiprocessing子產品支援程序間通信的兩種主要形式:管道和隊列
都是基于消息傳遞實作的,但是隊列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了      

生産者消費者模型

在并發程式設計中使用生産者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生産線程和消費線程的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生産者和消費者模式

線上程世界裡,生産者就是生産資料的線程,消費者就是消費資料的線程。在多線程開發當中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者。為了解決這個問題于是引入了生産者和消費者模式。

 什麼是生産者消費者模式

生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。

#基于隊列實作生産者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生産者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')      
#生産者消費者模型總結

    #程式中有兩類角色
        一類負責生産資料(生産者)
        一類負責處理資料(消費者)
        
    #引入生産者消費者模型為了解決的問題是:
        平衡生産者與消費者之間的工作能力,進而提高程式整體處理資料的速度
        
    #如何實作:
        生産者<-->隊列<——>消費者
    #生産者消費者模型實作類程式的解耦和      

此時的問題是主程序永遠不會結束,原因是:生産者p在生産完後就結束了,但是消費者c在取空了q之後,則一直處于死循環中且卡在q.get()這一步。

解決方式無非是讓生産者在生産完畢後,往隊列中再發一個結束信号,這樣消費者在接收到結束信号後就可以break出死循環

#生産者在生産完畢後發送結束信号None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信号則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發送結束信号
if __name__ == '__main__':
    q=Queue()
    #生産者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')      

注意:結束信号None,不一定要由生産者發,主程序裡同樣可以發,但主程序需要等生産者結束後才應該發送該信号

#主程序在生産者生産完畢後發送結束信号None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信号則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生産者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #發送結束信号
    print('主')      

但上述解決方式,在有多個生産者和多個消費者時,我們則需要用一個很low的方式去解決

#有幾個消費者就需要發送幾次結束信号:相當low
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信号則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))



if __name__ == '__main__':
    q=Queue()
    #生産者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生産者全部生産完畢,才應該發送結束信号
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾次結束信号None
    q.put(None) #發送結束信号
    print('主')      

其實我們的思路無非是發送結束信号而已,有另外一種隊列提供了這種機制

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知程序是使用共享的信号和條件變量來實作的。

#參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。

#方法介紹:
JoinableQueue的執行個體p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信号,表示q.get()的傳回項目已經被處理。如果調用此方法的次數大于從隊列中删除項目的數量,将引發ValueError異常
q.join():生産者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞将持續到隊列中的每個項目均調用q.task_done()方法為止      
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

        q.task_done() #向q.join()發送一次信号,證明一個資料已經被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))
    q.join()


if __name__ == '__main__':
    q=JoinableQueue()
    #生産者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主') 
    
    #主程序等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的資料
    #因而c1,c2也沒有存在的價值了,應該随着主程序的結束而結束,是以設定成守護程序      

七 管道

程序間通信(IPC)方式二:管道(不推薦使用,了解即可)

#建立管道的類:
Pipe([duplex]):在程序之間建立一條管道,并傳回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接配接對象,強調一點:必須在産生Process對象之前産生管道
#參數介紹:
dumplex:預設管道是全雙工的,如果将duplex射成False,conn1隻能用于接收,conn2隻能用于發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接配接的另外一端已經關閉,那麼recv方法會抛出EOFError。
    conn1.send(obj):通過連接配接發送對象。obj是與序列化相容的任意對象
 #其他方法:
conn1.close():關閉連接配接。如果conn1被垃圾回收,将自動調用此方法
conn1.fileno():傳回連接配接使用的整數檔案描述符
conn1.poll([timeout]):如果連接配接上的資料可用,傳回True。timeout指定等待的最長時限。如果省略此參數,方法将立即傳回結果。如果将timeout射成None,操作将無限期地等待資料到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的位元組消息。maxlength指定要接收的最大位元組數。如果進入的消息,超過了這個最大值,将引發IOError異常,并且在連接配接上無法進行進一步讀取。如果連接配接的另外一端已經關閉,再也不存在任何資料,将引發EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):通過連接配接發送位元組資料緩沖區,buffer是支援緩沖區接口的任意對象,offset是緩沖區中的位元組偏移量,而size是要發送位元組數。結果資料以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組消息,并把它儲存在buffer對象中,該對象支援可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的位元組位移。傳回值是收到的位元組數。如果消息長度大于可用的緩沖區空間,将引發BufferTooShort異常。      
#基于管道實作程序間通信(與隊列的方式是類似的,隊列就是管道加鎖實作的)
from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主程序')      

注意:生産者和消費者都沒有使用管道的某個端點,就應該将其關閉,如在生産者中關閉管道的右端,在消費者中關閉管道的左端。如果忘記執行這些步驟,程式可能再消費者中的recv()操作上挂起。管道是由作業系統進行引用計數的,必須在所有程序中關閉管道後才能生産EOFError異常。是以在生産者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

#管道可以用于雙向通信,利用通常在用戶端/伺服器中使用的請求/響應模型或遠端過程調用,就可以使用管道編寫與程序互動的程式
from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主程序')
#注意:send()和recv()方法使用pickle子產品對對象進行序列化。      

八 共享資料

展望未來,基于消息傳遞的并發程式設計是大勢所趨

即便是使用線程,推薦做法也是将程式設計為大量獨立的線程集合

通過消息隊列交換資料。這樣極大地減少了對使用鎖定和其他同步手段的需求,

還可以擴充到分布式系統中

程序間通信應該盡量避免使用本節所講的共享資料的方式

程序間資料是獨立的,可以借助于隊列或管道實作通信,二者都是基于消息傳遞的

雖然程序間資料獨立,但可以通過Manager實作資料共享,事實上Manager的功能遠不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,      
#程序之間操作共享的資料
from multiprocessing import Manager,Process,Lock
import os
def work(d,lock):
    # with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
        #{'count': 94}      

九 信号量(了解)

#信号量Semahpore(同線程一樣)
#互斥鎖 同時隻允許一個線程更改資料,而Semaphore是同時允許一定數量的線程更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人隻能等裡面有人出來了才能再進去,如果指定信号量為3,那麼來一個人獲得一把鎖,計數加1,當計數等于3時,後面的人均需要等待。一旦釋放,就有人可以獲得一把鎖

#信号量與程序池的概念很像,但是要區分開,信号量涉及到加鎖的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 占到一個茅坑' %user)
    time.sleep(random.randint(0,3)) #模拟每個人拉屎速度不一樣,0代表有的人蹲下就起來了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')      

十 事件(了解)

#Event(同線程一樣)
#python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
#    事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

#clear:将“Flag”設定為False
#set:将“Flag”設定為True
 

#_*_coding:utf-8_*_
#!/usr/bin/env python

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m紅燈亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m車%s 看見綠燈亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('燈的是%s,警車走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')      

十一 程序池

在利用Python進行系統管理的時候,特别是同時操作多個檔案目錄,或者遠端控制多台主機,并行操作可以節約大量的時間。多程序是實作并發的手段之一,需要注意的問題是:

  1. 很明顯需要并發執行的任務通常要遠大于核數
  2. 一個作業系統不可能無限開啟程序,通常有幾個核就開幾個程序
  3. 程序開啟過多,效率反而會下降(開啟程序是需要占用系統資源的,而且開啟多餘核數目的程序也無法做到并行)

例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動态成生多個程序,十幾個還好,但如果是上百個,上千個。。。手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。

我們就可以通過維護一個程序池來控制程序數目,比如httpd的程序模式,規定最小程序數和最大程序數...

ps:對于遠端過程調用的進階應用程式而言,應該使用程序池,Pool可以提供指定數量的程序,供使用者調用,當有新的請求送出到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,就重用程序池中的程序。

建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個程序去執行所有任務,不會開啟其他程序

Pool([numprocess  [,initializer [, initargs]]]):建立程序池      

參數介紹:

numprocess:要建立的程序數,如果省略,将預設使用cpu_count()的值
initializer:是每個工作程序啟動時要執行的可調用對象,預設為None
initargs:是要傳給initializer的參數組      

方法介紹:

主要方法:

p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後傳回結果。需要強調的是:此操作并不會在所有池工作程序中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後傳回結果。此方法的結果是AsyncResult類的執行個體,callback是可調用對象,接收輸入參數。當func的結果變為可用時,将了解傳遞給callback。callback禁止執行任何阻塞操作,否則将接收其他異步操作中的結果。

p.close():關閉程序池,防止進一步操作。如果所有操作持續挂起,它們将在工作程序終止前完成
P.jion():等待所有工作程序退出。此方法隻能在close()或teminate()之後調用      

 其他方法(了解部分)

方法apply_async()和map_async()的傳回值是AsyncResul的執行個體obj。執行個體具有以下方法
obj.get():傳回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間内還沒有到達,将引發一場。如果遠端操作中引發了異常,它将在調用此方法時再次被引發。
obj.ready():如果調用完成,傳回True
obj.successful():如果調用完成且沒有引發異常,傳回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何挂起工作。如果p被垃圾回收,将自動調用此函數      

應用:

#同步調用apply
from multiprocessing import Pool
import os,time
def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) #同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞,但不管該任務是否存在阻塞,同步調用都會在原地等着,隻是等的過程中若是任務發生了阻塞就會被奪走cpu的執行權限
        res_l.append(res)
    print(res_l)      
#異步調用apply_async
from multiprocessing import Pool
import os,time
def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) #同步運作,阻塞、直到本次任務執行完畢拿到res
        res_l.append(res)

    #異步apply_async用法:如果使用異步送出的任務,主程序需要使用jion,等待程序池内任務都處理完,然後可以用get收集結果,否則,主程序結束,程序池可能還沒來得及執行,也就跟着一起結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get來擷取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻擷取結果,也根本無需get      
#詳解:apply_async與apply
#一:使用程序池(異步調用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會添加新的程序進去
        res_l.append(res)
    print("==============================>") #沒有後面的join,或get,則程式整體結束,程序池中的任務還沒來得及全部執行完也都跟着主程序一起結束了

    pool.close() #關閉程序池,防止進一步操作。如果所有操作持續挂起,它們将在工作程序終止前完成
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的程序加入到pool,join函數等待所有子程序結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的清單,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去擷取結果
    for i in res_l:
        print(i.get()) #使用get來擷取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻擷取結果,也根本無需get

#二:使用程序池(同步調用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會添加新的程序進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的程序加入到pool,join函數等待所有子程序結束

    print(res_l) #看到的就是最終的結果組成的清單
    for i in res_l: #apply是同步的,是以直接得到結果,沒有get()方法
        print(i)      

練習2:使用程序池維護固定數目的程序(重寫練習1)

server端

#Pool内的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())
#開啟6個用戶端,會發現2個用戶端處于等待狀态
#在每個程序内檢視pid,會發現pid使用為4個,即多個用戶端公用4個程序
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('程序pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool()
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間隻有一個用戶端能通路      

用戶端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))      

發現:并發開啟多個用戶端,服務端同一時間隻有3個不同的pid,幹掉一個用戶端,另外一個用戶端才會進來,被3個程序之一處理

回調函數:

需要回調函數的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回調函數(主程序負責執行),這樣主程序在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了

'''
列印結果:
<程序3388> get https://www.baidu.com
<程序3389> get https://www.python.org
<程序3390> get https://www.openstack.org
<程序3388> get https://help.github.com/
<程序3387> parse https://www.baidu.com
<程序3389> get http://www.sina.com.cn/
<程序3387> parse https://www.python.org
<程序3387> parse https://help.github.com/
<程序3387> parse http://www.sina.com.cn/
<程序3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''      
#爬蟲案例
from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))      

如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待程序池中所有程序執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有結果
    print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理      

繼續閱讀