天天看點

06講:python爬蟲之多程序前言

文章目錄

  • 前言
    • 1. 多程序的含義
    • 2.Python 多程序的優勢
    • 3.多程序的實作
      •    ①直接使用 Process 類
      •    ②繼承 Process 類
      •    ③守護程序
      •    ④等待程序
      •    ⑤終止程序
      •    ⑥程序互斥鎖
      •    ⑦ 信号量
      •    ⑧隊列
      •    ⑨管道
      •    ⑩程序池

前言

    在上一講我們了解了多線程的基本概念,同時我們也提到,Python 中的多線程是不能很好發揮多核優勢的,如果想要發揮多核優勢,最好還是使用多程序(因為GIL的關系)。

    那麼本講我們就來了解下多程序的基本概念和用 Python 實作多程序的方法。(特别重要)

1. 多程序的含義

    程序(Process)是具有一定獨立功能的程式關于某個資料集合上的一次運作活動,是系統進行資源配置設定和排程的一個獨立機關。

    顧名思義,多程序就是啟用多個程序同時運作。由于程序是線程的集合,而且程序是由一個或多個線程構成的,是以多程序的運作意味着有大于或等于程序數量的線程在運作。

2.Python 多程序的優勢

    通過上一講我們知道,由于程序中 GIL 的存在,Python 中的多線程并不能很好地發揮多核優勢,一個程序中的多個線程,在同一時刻隻能有一個線程運作。

    而對于多程序來說,每個程序都有屬于自己的 GIL,是以,在多核處理器下,多程序的運作是不會受 GIL 的影響的。是以,多程序能更好地發揮多核的優勢。

    當然,對于爬蟲這種 IO 密集型任務來說,多線程和多程序影響差别并不大。對于計算密集型任務來說,Python 的多程序相比多線程,其多核運作效率會有成倍的提升。

    總的來說,Python 的多程序整體來看是比多線程更有優勢的。是以,在條件允許的情況下,能用多程序就盡量用多程序。

    不過值得注意的是,由于程序是系統進行資源配置設定和排程的一個獨立機關,是以各個程序之間的資料是無法共享的,如多個程序無法共享一個全局變量,程序之間的資料共享需要有單獨的機制來實作,這在後面也會講到。

3.多程序的實作

    在 Python 中也有内置的庫來實作多程序,它就是 multiprocessing。(大家還記得線程的庫名嗎?)

    multiprocessing 提供了一系列的元件,如 Process(程序)、Queue(隊列)、Semaphore(信号量)、Pipe(管道)、Lock(鎖)、Pool(程序池)等,接下來讓我們來了解下它們的使用方法。

   ①直接使用 Process 類

    在 multiprocessing 中,每一個程序都用一個 Process 類來表示。它的 API 調用如下:

  • target 表示調用對象,你可以傳入方法的名字。
  • args 表示被調用對象的位置參數元組,比如 target 是函數 func,他有兩個參數 m,n,那麼 args 就傳入 [m, n]

    即可。

  • kwargs 表示調用對象的字典。
  • name 是别名,相當于給這個程序取一個名字
  • group 分組。

我們先用一個執行個體來感受一下:

import multiprocessing

def process(index):
    process(f"Process: {index}")

if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=process, args=(i,))
        p.start()
           

    這是一個實作多程序最基礎的方式:通過建立== Process 來建立一個子程序==,其中 target 參數傳入方法名,args 是方法的參數,是以元組的形式傳入,其和被調用的方法 process 的參數是一一對應的。

    注意:這裡 args 必須要是一個元組,如果隻有一個參數,那也要在元組第一個元素後面加一個逗号,如果沒有逗号則和單個元素本身沒有差別,無法構成元組,導緻參數傳遞出現問題。

    建立完程序之後,我們通過調用 start 方法即可啟動程序了。運作結果如下:

Process: 0
Process: 1
Process: 2
Process: 3
Process: 4	
           

    可以看到,我們運作了 5 個子程序,每個程序都調用了 process 方法。process 方法的 index 參數通過 Process 的 args 傳入,分别是 0~4 這 5 個序号,最後列印出來,5 個子程序運作結束。

    由于程序是 Python 中最小的資源配置設定單元,是以這些程序和線程不同,各個程序之間的資料是不會共享的,每啟動一個程序,都會獨立配置設定資源。

    另外,在目前 CPU 核數足夠的情況下,這些不同的程序會配置設定給不同的 CPU 核來運作,實作真正的并行執行。

multiprocessing 還提供了幾個比較有用的方法,如我們可以通過 cpu_count 的方法來擷取目前機器 CPU 的核心數量,通過 active_children 方法擷取目前還在運作的所有程序。

    下面通過一個執行個體來看一下:

import multiprocessing
import time

def process(index):
    time.sleep(index)
    print(f"Process: {index}")

if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=process, args=[i,])
        p.start()
    print(f"CPU number: {multiprocessing.cpu_count()}") # 擷取目前機器 CPU 的核心數量
    for p in multiprocessing.active_children(): # 擷取目前還在運作的所有程序
        print(f"Child process name: {p.name} id: {p.pid}")
    print("Process Ended")
           

運作結果如下:

Process: 0
CPU number: 8
Child process name: Process-5 id: 73595
Child process name: Process-2 id: 73592
Child process name: Process-3 id: 73593
Child process name: Process-4 id: 73594
Process Ended
Process: 1
Process: 2
Process: 3
Process: 4
           

    在上面的例子中我們通過 cpu_count 成功擷取了 CPU 核心的數量:8 個,當然不同的機器結果可能不同。

    另外我們還通過 active_children 擷取到了目前正在活躍運作的程序清單。然後我們周遊了每個程序,并将它們的名稱和程序号列印出來了,這裡程序号直接使用 pid 屬性即可擷取,程序名稱直接通過 name 屬性即可擷取。

    以上我們就完成了多程序的建立和一些基本資訊的擷取。

   ②繼承 Process 類

    在上面的例子中,我們建立程序是直接使用 Process 這個類來建立的,這是一種建立程序的方式。不過,建立程序的方式不止這一種,同樣,我們也可以像線程 Thread 一樣來通過繼承的方式建立一個程序類,程序的基本操作我們在子類的 run 方法中實作即可。

    通過一個執行個體來看一下:

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
    for i in range(2, 5):
        p = MyProcess(i)
        p.start()
           

    我們首先聲明了一個構造方法,這個方法接收一個 loop 參數,代表循環次數,并将其設定為全局變量。在 run 方法中,又使用這個 loop 變量循環了 loop 次并列印了目前的程序号和循環次數。

    在調用時,我們用 range 方法得到了 2、3、4 三個數字,并把它們分别初始化了 MyProcess 程序,然後調用 start 方法将程序啟動起來。

    注意:這裡程序的執行邏輯需要在 run 方法中實作,啟動程序需要調用 start 方法,調用之後 run 方法便會執行。

    運作結果如下:

Pid: 73667 LoopCount: 0
Pid: 73668 LoopCount: 0
Pid: 73669 LoopCount: 0
Pid: 73667 LoopCount: 1
Pid: 73668 LoopCount: 1
Pid: 73669 LoopCount: 1
Pid: 73668 LoopCount: 2
Pid: 73669 LoopCount: 2
Pid: 73669 LoopCount: 3
           

    可以看到,三個程序分别列印出了 2、3、4 條結果,即程序 73667 列印了 2 次 結果,程序 73668 列印了 3 次結果,程序 73669 列印了 4 次結果。

    注意,這裡的程序 pid 代表程序号,不同機器、不同時刻運作結果可能不同。

    通過上面的方式,我們也非常友善地實作了一個程序的定義。為了複用友善,我們可以把一些方法寫在每個程序類裡封裝好,在使用時直接初始化一個程序類運作即可。

   ③守護程序

    在多程序中,同樣存在守護程序的概念,如果一個程序被設定為守護程序,當父程序結束後,子程序會自動被終止,我們可以通過設定 daemon 屬性來控制是否為守護程序。

    還是原來的例子,增加了 deamon 屬性的設定:

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()
print("Main Process ended")
           

    運作結果如下:

Main Process ended
           

    結果很簡單,因為主程序沒有做任何事情,直接輸出一句話結束,是以在這時也直接終止了子程序的運作。

    這樣可以有效防止無控制地生成子程序。這樣的寫法可以讓我們在主程序運作結束後無需額外擔心子程序是否關閉,避免了獨立子程序的運作。

   ④等待程序

    上面的運作效果其實不太符合我們預期:主程序運作結束時,子程序(守護程序)也都退出了,子程序什麼都沒來得及執行。

    能不能讓所有子程序都執行完了然後再結束呢?當然是可以的,隻需要加入== join== 方法即可,我們可以将代碼改寫如下:

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
    processes = []
    for i in range(2, 5):
        p = MyProcess(i)
        processes.append(p)
        p.daemon = True
        p.start()
    for p in processes:
        p.join()
print("Main Process ended")
           

    運作結果如下:

Pid: 40866 LoopCount: 0
Pid: 40867 LoopCount: 0
Pid: 40868 LoopCount: 0
Pid: 40866 LoopCount: 1
Pid: 40867 LoopCount: 1
Pid: 40868 LoopCount: 1
Pid: 40867 LoopCount: 2
Pid: 40868 LoopCount: 2
Pid: 40868 LoopCount: 3
Main Process ended
           

    在調用 start 和 join 方法後,父程序就可以等待所有子程序都執行完畢後,再列印出結束的結果。

    預設情況下,join 是無限期的。也就是說,如果有子程序沒有運作完畢,主程序會一直等待。這種情況下,如果子程序出現問題陷入了死循環,主程序也會無限等待下去。怎麼解決這個問題呢?可以給 join 方法傳遞一個逾時參數,代表最長等待秒數。如果子程序沒有在這個指定秒數之内完成,會被強制傳回,主程序不再會等待。也就是說這個參數設定了主程序等待該子程序的最長時間。

    例如這裡我們傳入 1,代表最長等待 1 秒,代碼改寫如下:

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
    processes = []
    for i in range(3, 5):
        p = MyProcess(i)
        processes.append(p)
        p.daemon = True
        p.start()
    for p in processes:
        p.join(1)
print("Main Process ended")
           

    運作結果如下:

Pid: 40970 LoopCount: 0
Pid: 40971 LoopCount: 0
Pid: 40970 LoopCount: 1
Pid: 40971 LoopCount: 1
Main Process ended
           

    可以看到,有的子程序本來要運作 3 秒,結果運作 1 秒就被強制傳回了,由于是守護程序,該子程序被終止了。

    到這裡,我們就了解了守護程序、程序等待和逾時設定的用法。

   ⑤終止程序

    當然,終止程序不止有守護程序這一種做法,我們也可以通過 terminate 方法來終止某個子程序,另外我們還可以通過 is_alive 方法判斷程序是否還在運作。

    下面我們來看一個執行個體:

import multiprocessing, time

def process():
    print("Starting")
    time.sleep(5)
    print("Finished")

if __name__ == '__main__':
    p = multiprocessing.Process(target=process)
    print("Before:", p, p.is_alive())

    p.start()
    print("During:", p, p.is_alive())

    p.terminate()
    print("Terminate:", p, p.is_alive())

    p.join()
    print("Joined:", p, p.is_alive())
           

    在上面的例子中,我們用 Process 建立了一個程序,接着調用 start 方法啟動這個程序,然後調用 terminate 方法将程序終止,最後調用 join 方法。

    另外,在程序運作不同的階段,我們還通過 is_alive 方法判斷目前程序是否還在運作。

    運作結果如下:

Before: <Process(Process-1, initial)> False
During: <Process(Process-1, started)> True
Terminate: <Process(Process-1, started)> True
Joined: <Process(Process-1, stopped[SIGTERM])> False
           

    這裡有一個值得注意的地方,在調用 terminate 方法之後,我們用 is_alive 方法擷取程序的狀态發現依然還是運作狀态。在調用 join 方法之後,is_alive 方法擷取程序的運作狀态才變為終止狀态。

    是以,在調用 terminate 方法之後,記得要調用一下 join 方法,這裡調用 join 方法可以為程序提供時間來更新對象狀态,用來反映出最終的程序終止效果。

   ⑥程序互斥鎖

    在上面的一些執行個體中,我們可能會遇到如下的運作結果:

Pid: 73993 LoopCount: 0
Pid: 73993 LoopCount: 1
Pid: 73994 LoopCount: 0Pid: 73994 LoopCount: 1
Pid: 73994 LoopCount: 2
Pid: 73995 LoopCount: 0
Pid: 73995 LoopCount: 1
Pid: 73995 LoopCount: 2
Pid: 73995 LoopCount: 3
Main Process ended
           

    我們發現,有的輸出結果沒有換行。這是什麼原因造成的呢?

    這種情況是由多個程序并行執行導緻的,兩個程序同時進行了輸出,結果第一個程序的換行沒有來得及輸出,第二個程序就輸出了結果,導緻最終輸出沒有換行。

    那如何來避免這種問題?如果我們能保證,多個程序運作期間的任一時間,隻能一個程序輸出,其他程序等待,等剛才那個程序輸出完畢之後,另一個程序再進行輸出,這樣就不會出現輸出沒有換行的現象了。

    這種解決方案實際上就是實作了程序互斥,避免了多個程序同時搶占臨界區(輸出)資源。我們可以通過== multiprocessing 中的 Lock 來實作==。Lock,即鎖,在一個程序輸出時,加鎖,其他程序等待。等此程序執行結束後,釋放鎖,其他程序可以進行輸出。

    我們首先實作一個不加鎖的執行個體,代碼如下:

from multiprocessing import Process, Lock
import time

class MyProcess(Process):
    def __init__(self, loop, lock):
        Process.__init__(self)
        self.loop = loop
        self.lock = lock

    def run(self):
        for count in range(self.loop):
            time.sleep(0.1)
            # self.lock.acquire()
            print(f"Pid: {self.pid} LoopCount: {count}")
            # self.lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(10, 15):
        p = MyProcess(i, lock)
        p.start()
           

運作結果如下:

Pid: 74030 LoopCount: 0
Pid: 74031 LoopCount: 0
Pid: 74032 LoopCount: 0
Pid: 74033 LoopCount: 0
Pid: 74034 LoopCount: 0
Pid: 74030 LoopCount: 1
Pid: 74031 LoopCount: 1
Pid: 74032 LoopCount: 1Pid: 74033 LoopCount: 1
Pid: 74034 LoopCount: 1
Pid: 74030 LoopCount: 2
...
           

    可以看到運作結果中有些輸出已經出現了不換行的問題。

    我們對其加鎖,取消掉剛才代碼中的兩行注釋,重新運作,運作結果如下:

Pid: 74061 LoopCount: 0
Pid: 74062 LoopCount: 0
Pid: 74063 LoopCount: 0
Pid: 74064 LoopCount: 0
Pid: 74065 LoopCount: 0
Pid: 74061 LoopCount: 1
Pid: 74062 LoopCount: 1
Pid: 74063 LoopCount: 1
Pid: 74064 LoopCount: 1
Pid: 74065 LoopCount: 1
Pid: 74061 LoopCount: 2
Pid: 74062 LoopCount: 2
Pid: 74064 LoopCount: 2
...
           

    這時輸出效果就正常了。

    是以,在通路一些臨界區資源時,使用 Lock 可以有效避免程序同時占用資源而導緻的一些問題。

   ⑦ 信号量

    程序互斥鎖可以使同一時刻隻有一個程序能通路共享資源,如上面的例子所展示的那樣,在同一時刻隻能有一個程序輸出結果。但有時候我們需要允許多個程序來通路共享資源,同時還需要限制能通路共享資源的程序的數量。

    這種需求該如何實作呢?可以用信号量,信号量是程序同步過程中一個比較重要的角色。它可以控制臨界資源的數量,實作多個程序同時通路共享資源,限制程序的并發量。

    如果你學過作業系統,那麼一定對這方面非常了解,如果你還不了解信号量是什麼,可以先熟悉一下這個概念。

    我們可以用 multiprocessing 庫中的 Semaphore 來實作信号量。

    那麼接下來我們就用一個執行個體來示範一下程序之間利用 Semaphore 做到多個程序共享資源,同時又限制同時可通路的程序數量,代碼如下:

from multiprocessing import Process, Semaphore, Lock, Queue
import time

buffer = Queue(10)
empty = Semaphore(2) 
full = Semaphore(0)
lock = Lock()

class Consumer(Process): # 這裡是消費者 去拿生産者的
    def run(self):
        global buffer, empty, full, lock
        while True:
            full.acquire()
            lock.acquire()
            buffer.get()
            print('Consumer pop an element')
            time.sleep(1)
            lock.release()
            empty.release()

class Producer(Process):
    def run(self):
        global buffer, empty, full, lock
        while True:
            empty.acquire() #緩沖區空閑區大小-1
            lock.acquire() # 進行加鎖
            buffer.put(1) # 對緩沖區進行操作
            print('Producer append an element')
            time.sleep(1)
            lock.release() # 釋放鎖
            full.release() # 緩沖區位置數量加 1
if __name__ == '__main__':
    p = Producer()
    c = Consumer()
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print("Main Process Ended")
           

    如上代碼實作了經典的生産者和消費者問題。它定義了兩個程序類,一個是消費者,一個是生産者。

    另外,這裡使用 multiprocessing 中的 Queue 定義了一個共享隊列,然後定義了兩個信号量 Semaphore,一個代表緩沖區空餘數,一個表示緩沖區占用數。

    生産者 Producer 使用 acquire 方法來占用一個緩沖區位置,緩沖區空閑區大小減 1,接下來進行加鎖,對緩沖區進行操作,然後釋放鎖,最後讓代表占用的緩沖區位置數量加 1,消費者則相反。

    運作結果如下:

Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
           

    我們發現兩個程序在交替運作,生産者先放入緩沖區物品,然後消費者取出,不停地進行循環。 你可以通過上面的例子來體會信号量 Semaphore 的用法,通過 Semaphore 我們很好地控制了程序對資源的并發通路數量。

   ⑧隊列

    在上面的例子中我們使用 Queue 作為程序通信的共享隊列使用。

    而如果我們把上面程式中的 Queue 換成普通的 list,是完全起不到效果的,因為程序和程序之間的資源是不共享的。即使在一個程序中改變了這個 list,在另一個程序也不能擷取到這個 list 的狀态,是以聲明全局變量對多程序是沒有用處的。

    那程序如何共享資料呢?可以用 Queue,即隊列。當然這裡的隊列指的是 multiprocessing 裡面的 Queue。

    依然用上面的例子,我們一個程序向隊列中放入随機資料,然後另一個程序取出資料。

from multiprocessing import Process, Semaphore, Lock, Queue
import time
from random import random

buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()

class Consumer(Process):
    def run(self):
        global buffer, empty, full, lock
        while True:
            full.acquire()
            lock.acquire()
            print(f'Consumer get {buffer.get()}')
            time.sleep(1)
            lock.release()
            empty.release()

class Producer(Process):
    def run(self):
        global buffer, empty, full, lock
        while True:
            empty.acquire()
            lock.acquire()
            num = random()
            print(f'Producer putt {num}')
            buffer.put(num)
            time.sleep(1)
            lock.release()
            full.release()

if __name__ == '__main__':
    p = Producer()
    c = Consumer()
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print("Main Process Ended")
           

    運作結果如下:

Producer put  0.719213647437
Producer put  0.44287326683
Consumer get 0.719213647437
Consumer get 0.44287326683
Producer put  0.722859424381
Producer put  0.525321338921
Consumer get 0.722859424381
Consumer get 0.525321338921
           

    在上面的例子中我們聲明了兩個程序,一個程序為生産者 Producer,另一個為消費者 Consumer,生産者不斷向 Queue 裡面添加随機數,消費者不斷從隊列裡面取随機數。

    生産者在放資料的時候調用了 Queue 的 put 方法,消費者在取的時候使用了 get 方法,這樣我們就通過 Queue 實作兩個程序的資料共享了。

   ⑨管道

    剛才我們使用 Queue 實作了程序間的資料共享,那麼程序之間直接通信,如收發資訊,用什麼比較好呢?可以用== Pipe管道==。

    管道,我們可以把它了解為兩個程序之間通信的通道。管道可以是單向的,即 half-duplex:一個程序負責發消息,另一個程序負責收消息;也可以是雙向的 duplex,即互相收發消息。

    預設聲明 Pipe 對象是雙向管道,如果要建立單向管道,可以在初始化的時候傳入 deplex 參數為 False。

    我們用一個執行個體來感受一下:

from multiprocessing import Process, Pipe

class Consumer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe

    def run(self):
        self.pipe.send('Consumer Words')
        print(f'Consumer Received: {self.pipe.recv()}')

class Producer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe

    def run(self):
        print(f'Producer Received: {self.pipe.recv()}')
        self.pipe.send('Producer Words')

if __name__ == '__main__':
    pipe = Pipe()
    p = Producer(pipe[0])
    c = Consumer(pipe[1])
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print('Main Process Ended')
           

    在這個例子裡我們聲明了一個預設為雙向的管道,然後将管道的兩端分别傳給兩個程序。兩個程序互相收發。觀察一下結果:

Producer Received: Consumer Words
Consumer Received: Producer Words
Main Process Ended
           

    管道 Pipe 就像程序之間搭建的橋梁,利用它我們就可以很友善地實作程序間通信了。

   ⑩程序池

    在前面,我們講了可以使用 Process 來建立程序,同時也講了如何用 Semaphore 來控制程序的并發執行數量。

    假如現在我們遇到這麼一個問題,我有 10000 個任務,每個任務需要啟動一個程序來執行,并且一個程序運作完畢之後要緊接着啟動下一個程序,同時我還需要控制程序的并發數量,不能并發太高,不然 CPU 處理不過來(如果同時運作的程序能維持在一個最高恒定值當然使用率是最高的)。

    那麼我們該如何來實作這個需求呢?

    用 Process 和 Semaphore 可以實作,但是實作起來比較我們可以用 Process 和 Semaphore 解決問題,但是實作起來比較煩瑣。而這種需求在平時又是非常常見的。此時,我們就可以派上程序池了,即 multiprocessing 中的 Pool。

    Pool 可以提供指定數量的程序,供使用者調用,當有新的請求送出到 pool 中時,如果池還沒有滿,就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來執行它。

    我們用一個執行個體來實作一下,代碼如下:

from multiprocessing import Pool
import time

def function(index):
    print(f'Start process: {index}')
    time.sleep(3)
    print(f'End process {index}',)

if __name__ == '__main__':
    pool = Pool(processes=3)
    for i in range(4):
        pool.apply_async(function, args=(i, ))
    print('Main Process started')
    pool.close()
    pool.join()
    print("Main Process ended")
           

    在這個例子中我們聲明了一個大小為 3 的程序池,通過 processes 參數來指定,如果不指定,那麼會自動根據處理器核心來配置設定程序數。接着我們使用 apply_async 方法将程序添加進去,args 可以用來傳遞參數。

    運作結果如下:

Main Process started
Start process: 0
Start process: 1
Start process: 2
End process 0
End process 1
End process 2
Start process: 3
End process 3
Main Process ended
           

    程序池大小為 3,是以最初可以看到有 3 個程序同時執行,第程序池大小為 3,是以最初可以看到有 3 個程序同時執行,第4個程序在等待,在有程序運作完畢之後,第4個程序馬上跟着運作,出現了如上的運作效果。

    最後,我們要記得調用 close 方法來關閉程序池,使其不再接受新的任務,然後調用 join 方法讓主程序等待子程序的退出,等子程序運作完畢之後,主程序接着運作并結束。

    不過上面的寫法多少有些煩瑣,這裡再介紹程序池一個更好用的 map 方法,可以将上述寫法簡化很多。

    map 方法是怎麼用的呢?第一個參數就是要啟動的程序對應的執行方法,第 2 個參數是一個可疊代對象,其中的每個元素會被傳遞給這個執行方法。

    舉個例子:現在我們有一個 list,裡面包含了很多 URL,另外我們也定義了一個方法用來抓取每個 URL 内容并解析,那麼我們可以直接在 map 的第一個參數傳入方法名,第 2 個參數傳入 URL 數組。

    我們用一個執行個體來感受一下:

from multiprocessing import Pool
import urllib.request
import urllib.error

def scrape(url):
    try:
        urllib.request.urlopen(url)
        print(f'URL {url} Scraped')
    except (urllib.error.HTTPError, urllib.error.URLError):
        print(f'URL {url} not Scraped')

if __name__ == '__main__':
    pool = Pool(processes=3)
    urls = [
        'https://www.baidu.com',
        'http://www.meituan.com/',
        'https://www.yxinmiracle.com/',
        'http://xxxyxxx.net',
    ]
    pool.map(scrape, urls)
    pool.close()
           

    這個例子中我們先定義了一個 scrape 方法,它接收一個參數 url,這裡就是請求了一下這個連結,然後輸出爬取成功的資訊,如果發生錯誤,則會輸出爬取失敗的資訊。

    首先我們要初始化一個 Pool,指定程序數為 3。然後我們聲明一個 urls 清單,接着我們調用了 map 方法,第 1 個參數就是程序對應的執行方法,第 2 個參數就是 urls 清單,map 方法會依次将 urls 的每個元素作為 scrape 的參數傳遞并啟動一個新的程序,加到程序池中執行。

    運作結果如下:

URL https://www.baidu.com Scraped
URL http://xxxyxxx.net not Scraped
URL https://www.yxinmiracle.com Scraped
URL http://www.meituan.com/ Scraped
           

    這樣,我們就可以實作 3 個程序并行運作。不同的程序互相獨立地輸出了對應的爬取結果。

    可以看到,我們利用 Pool 的 map 方法非常友善地實作了多程序的執行。後面我們也會在實戰案例中結合程序池來實作資料的爬取。

    以上便是 Python 中多程序的基本用法,本節内容比較多,後面的實戰案例也會用到這些内容,需要好好掌握。

本講大部分内容為崔慶才老師的爬蟲課(拉勾教育版權所有)上的課件以及我的一些些小小的了解,這系列都是我的課後筆記,如果有不對的地方可以請大家聯系我,謝謝!