天天看點

《Python爬蟲開發與項目實戰》——1.4 程序和線程

本節書摘來自華章計算機《python爬蟲開發與項目實戰》一書中的第1章,第1.4節,作者:範傳輝著,更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視

  在爬蟲開發中,程序和線程的概念是非常重要的。提高爬蟲的工作效率,打造分布式爬蟲,都離不開程序和線程的身影。本節将從多程序、多線程、協程和分布式程序等四個方面,幫助大家回顧python語言中程序和線程中的常用操作,以便在接下來的爬蟲開發中靈活運用程序和線程。

1.4.1 多程序

  python實作多程序的方式主要有兩種,一種方法是使用os子產品中的fork方法,另一種方法是使用multiprocessing子產品。這兩種方法的差別在于前者僅适用于unix/linux作業系統,對windows不支援,後者則是跨平台的實作方式。由于現在很多爬蟲程式都是運作在unix/linux作業系統上,是以本節對兩種方式都進行講解。

  1.?使用os子產品中的fork方式實作多程序

  python的os子產品封裝了常見的系統調用,其中就有fork方法。fork方法來自于unix/linux作業系統中提供的一個fork系統調用,這個方法非常特殊。普通的方法都是調用一次,傳回一次,而fork方法是調用一次,傳回兩次,原因在于作業系統将目前程序(父程序)複制出一份程序(子程序),這兩個程序幾乎完全相同,于是fork方法分别在父程序和子程序中傳回。子程序中永遠傳回0,父程序中傳回的是子程序的id。下面舉個例子,對python使用fork方法建立程序進行講解。其中os子產品中的getpid方法用于擷取目前程序的id,getppid方法用于擷取父程序的id。代碼如下:

  運作結果如下:

  2.?使用multiprocessing子產品建立多程序

  multiprocessing子產品提供了一個process類來描述一個程序對象。建立子程序時,隻需要傳入一個執行函數和函數的參數,即可完成一個process執行個體的建立,用start()方法啟動程序,用join()方法實作程序間的同步。下面通過一個例子來示範建立多程序的流程,代碼如下:

  以上介紹了建立程序的兩種方法,但是要啟動大量的子程序,使用程序池批量建立子程序的方式更加常見,因為當被操作對象數目不大時,可以直接利用multiprocessing中的process動态生成多個程序,如果是上百個、上千個目标,手動去限制程序數量卻又太過繁瑣,這時候程序池pool發揮作用的時候就到了。

  3.?multiprocessing子產品提供了一個pool類來代表程序池對象

  pool可以提供指定數量的程序供使用者調用,預設大小是cpu的核數。當有新的請求送出到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來處理它。下面通過一個例子來示範程序池的工作流程,代碼如下:

  上述程式先建立了容量為3的程序池,依次向程序池中添加了5個任務。從運作結果中可以看到雖然添加了5個任務,但是一開始隻運作了3個,而且每次最多運作3個程序。當一個任務結束了,新的任務依次添加進來,任務執行使用的程序依然是原來的程序,這一點通過程序的pid就可以看出來。

  4.?程序間通信

  假如建立了大量的程序,那程序間通信是必不可少的。python提供了多種程序間通信的方式,例如queue、pipe、value+array等。本節主要講解queue和pipe這兩種方式。queue和pipe的差別在于pipe常用來在兩個程序間通信,queue用來在多個程序間實作通信。

  首先講解一下queue通信方式。queue是多程序安全的隊列,可以使用queue實作多程序之間的資料傳遞。有兩個方法:put和get可以進行queue操作:

put方法用以插入資料到隊列中,它還有兩個可選參數:blocked和timeout。如果blocked為true(預設值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果逾時,會抛出queue.full異常。如果blocked為false,但該queue已滿,會立即抛出queue.full異常。

get方法可以從隊列讀取并且删除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為true(預設值),并且timeout為正值,那麼在等待時間内沒有取到任何元素,會抛出queue.empty異常。如果blocked為false,分兩種情況:如果queue有一個值可用,則立即傳回該值;否則,如果隊列為空,則立即抛出queue.empty異常。

  下面通過一個例子進行說明:在父程序中建立三個子程序,兩個子程序往queue中寫入資料,一個子程序從queue中讀取資料。程式示例如下:

《Python爬蟲開發與項目實戰》——1.4 程式和線程
《Python爬蟲開發與項目實戰》——1.4 程式和線程

  最後介紹一下pipe的通信機制,pipe常用來在兩個程序間進行通信,兩個程序分别位于管道的兩端。

  pipe方法傳回(conn1, conn2)代表一個管道的兩個端。pipe方法有duplex參數,如果duplex參數為true(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。若duplex為false,conn1隻負責接收消息,conn2隻負責發送消息。send和recv方法分别是發送和接收消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會抛出eoferror。

  下面通過一個例子進行說明:建立兩個程序,一個子程序通過pipe發送資料,一個子程序通過pipe接收資料。程式示例如下:

1.4.2 多線程

  多線程類似于同時執行多個不同程式,多線程運作有如下優點:

可以把運作時間長的任務放到背景去處理。

使用者界面可以更加吸引人,比如使用者點選了一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度。

程式的運作速度可能加快。

在一些需要等待的任務實作上,如使用者輸入、檔案讀寫和網絡收發資料等,線程就比較有用了。在這種情況下我們可以釋放一些珍貴的資源,如記憶體占用等。

  python的标準庫提供了兩個子產品:thread和threading,thread是低級子產品,threading是進階子產品,對thread進行了封裝。絕大多數情況下,我們隻需要使用threading這個進階子產品。

  1.?用threading子產品建立多線程

  threading子產品一般通過兩種方式建立多線程:第一種方式是把一個函數傳入并建立thread執行個體,然後調用start方法開始執行;第二種方式是直接從threading.thread繼承并建立線程類,然後重寫__init__方法和run方法。

  首先介紹第一種方法,通過一個簡單例子示範建立多線程的流程,程式如下:

《Python爬蟲開發與項目實戰》——1.4 程式和線程

  第二種方式從threading.thread繼承建立線程類,下面将方法一的程式進行重寫,程式如下:

  2.?線程同步

  如果多個線程共同對某個資料修改,則可能出現不可預料的結果,為了保證資料的正确性,需要對多個線程進行同步。使用thread對象的lock和rlock可以實作簡單的線程同步,這兩個對象都有acquire方法和release方法,對于那些每次隻允許一個線程操作的資料,可以将其操作放到acquire和release方法之間。

  對于lock對象而言,如果一個線程連續兩次進行acquire操作,那麼由于第一次acquire之後沒有release,第二次acquire将挂起線程。這會導緻lock對象永遠不會release,使得線程死鎖。rlock對象允許一個線程多次對其進行acquire操作,因為在其内部通過一個counter變量維護着線程acquire的次數。而且每一次的acquire操作必須有一個release操作與之對應,在所有的release操作完成之後,别的線程才能申請該rlock對象。下面通過一個簡單的例子示範線程同步的過程:

  3.?全局解釋器鎖(gil)

  在python的原始解釋器cpython中存在着gil(global interpreter lock,全局解釋器鎖),是以在解釋執行python代碼時,會産生互斥鎖來限制線程對共享資源的通路,直到解釋器遇到i/o操作或者操作次數達到一定數目時才會釋放gil。由于全局解釋器鎖的存在,在進行多線程操作的時候,不能調用多個cpu核心,隻能利用一個核心,是以在進行cpu密集型操作的時候,不推薦使用多線程,更加傾向于多程序。那麼多線程适合什麼樣的應用場景呢?對于io密集型操作,多線程可以明顯提高效率,例如python爬蟲的開發,絕大多數時間爬蟲是在等待socket傳回資料,網絡io的操作延時比cpu大得多。

1.4.3 協程

  協程(coroutine),又稱微線程,纖程,是一種使用者級的輕量級線程。協程擁有自己的寄存器上下文和棧。協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧。是以協程能保留上一次調用時的狀态,每次過程重入時,就相當于進入上一次調用的狀态。在并發程式設計中,協程與線程類似,每個協程表示一個執行單元,有自己的本地資料,與其他協程共享全局資料和其他資源。

  協程需要使用者自己來編寫排程邏輯,對于cpu來說,協程其實是單線程,是以cpu不用去考慮怎麼排程、切換上下文,這就省去了cpu的切換開銷,是以協程在一定程度上又好于多線程。那麼在python中是如何實作協程的呢?

  python通過yield提供了對協程的基本支援,但是不完全,而使用第三方gevent庫是更好的選擇,gevent提供了比較完善的協程支援。gevent是一個基于協程的python網絡函數庫,使用greenlet在libev事件循環頂部提供了一個有進階别并發性的api。主要特性有以下幾點:

基于libev的快速事件循環,linux上是epoll機制。

基于greenlet的輕量級執行單元。

api複用了python标準庫裡的内容。

支援ssl的協作式sockets。

可通過線程池或c-ares實作dns查詢。

通過monkey patching功能使得第三方子產品變成協作式。

  gevent對協程的支援,本質上是greenlet在實作切換工作。greenlet工作流程如下:假如進行通路網絡的io操作時,出現阻塞,greenlet就顯式切換到另一段沒有被阻塞的代碼段執行,直到原先的阻塞狀況消失以後,再自動切換回原來的代碼段繼續處理。是以,greenlet是一種合理安排的串行方式。

  由于io操作非常耗時,經常使程式處于等待狀态,有了gevent為我們自動切換協程,就保證總有greenlet在運作,而不是等待io,這就是協程一般比多線程效率高的原因。由于切換是在io操作時自動完成,是以gevent需要修改python自帶的一些标準庫,将一些常見的阻塞,如socket、select等地方實作協程跳轉,這一過程在啟動時通過monkey patch完成。下面通過一個的例子來示範gevent的使用流程,代碼如下:

《Python爬蟲開發與項目實戰》——1.4 程式和線程

  以上程式主要用了gevent中的spawn方法和joinall方法。spawn方法可以看做是用來形成協程,joinall方法就是添加這些協程任務,并且啟動運作。從運作結果來看,3個網絡操作是并發執行的,而且結束順序不同,但其實隻有一個線程。

  gevent中還提供了對池的支援。當擁有動态數量的greenlet需要進行并發管理(限制并發數)時,就可以使用池,這在處理大量的網絡和io操作時是非常需要的。接下來使用gevent中pool對象,對上面的例子進行改寫,程式如下:

  通過運作結果可以看出,pool對象确實對協程的并發數量進行了管理,先通路了前兩個網址,當其中一個任務完成時,才會執行第三個。

1.4.4 分布式程序

  分布式程序指的是将process程序分布到多台機器上,充分利用多台機器的性能完成複雜的任務。我們可以将這一點應用到分布式爬蟲的開發中。

  分布式程序在python中依然要用到multiprocessing子產品。multiprocessing子產品不但支援多程序,其中managers子子產品還支援把多程序分布到多台機器上。可以寫一個服務程序作為排程者,将任務分布到其他多個程序中,依靠網絡通信進行管理。舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖檔,如果使用多程序的話,一般是一個程序負責抓取圖檔的連結位址,将連結位址存放到queue中,另外的程序負責從queue中讀取連結位址進行下載下傳和存儲到本地。現在把這個過程做成分布式,一台機器上的程序負責抓取連結,其他機器上的程序負責下載下傳存儲。那麼遇到的主要問題是将queue暴露到網絡中,讓其他機器程序都可以通路,分布式程序就是将這一個過程進行了封裝,我們可以将這個過程稱為本地隊列的網絡化。整體過程如圖1-24所示。

《Python爬蟲開發與項目實戰》——1.4 程式和線程

  要實作上面例子的功能,建立分布式程序需要分為六個步驟:

  1)建立隊列queue,用來進行程序間的通信。服務程序建立任務隊列task_queue,用來作為傳遞任務給任務程序的通道;服務程序建立結果隊列result_queue,作為任務程序完成任務後回複服務程序的通道。在分布式多程序環境下,必須通過由queuemanager獲得的queue接口來添加任務。

  2)把第一步中建立的隊列在網絡上注冊,暴露給其他程序(主機),注冊後獲得網絡隊列,相當于本地隊列的映像。

  3)建立一個對象(queuemanager(basemanager))執行個體manager,綁定端口和驗證密碼。

  4)啟動第三步中建立的執行個體,即啟動管理manager,監管資訊通道。

  5)通過管理執行個體的方法獲得通過網絡通路的queue對象,即再把網絡隊列實體化成可以使用的本地隊列。

  6)建立任務到“本地”隊列中,自動上傳任務到網絡隊列中,配置設定給任務程序進行處理。

  接下來通過程式實作上面的例子(linux版),首先編寫的是服務程序(taskmanager.py),代碼如下:

  任務程序已經編寫完成,接下來編寫任務程序(taskworker.py),建立任務程序的步驟相對較少,需要四個步驟:

  1)使用queuemanager注冊用于擷取queue的方法名稱,任務程序隻能通過名稱來在網絡上擷取queue。

  2)連接配接伺服器,端口和驗證密碼注意保持與服務程序中完全一緻。

  3)從網絡上擷取queue,進行本地化。

  4)從task隊列擷取任務,并把結果寫入result隊列。

  程式taskworker.py代碼(win/linux版)如下:

  最後開始運作程式,先啟動服務程序taskmanager.py,運作結果如下:

  接着再啟動任務程序taskworker.py,運作結果如下:

  當任務程序運作結束後,服務程序運作結果如下:

  其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,實作大規模的分布式爬蟲。

  taskmanager.py程式在windows版下的代碼如下: