天天看點

一個使用 asyncio 協程的網絡爬蟲(一)

經典的計算機科學強調高效的算法,盡可能快地完成計算。但是很多網絡程式的時間并不是消耗在計算上,而是在等待許多慢速的連接配接或者低頻事件的發生。這些程式暴露出一個新的挑戰:如何高效的等待大量網絡事件。一個現代的解決方案是異步 i/o。

這一章我們将實作一個簡單的網絡爬蟲。這個爬蟲隻是一個原型式的異步應用,因為它等待許多響應而隻做少量的計算。一次爬的網頁越多,它就能越快的完成任務。如果它為每個動态的請求啟動一個線程的話,随着并發請求數量的增加,它會在耗盡套接字之前,耗盡記憶體或者線程相關的資源。使用異步 i/o 可以避免這個的問題。

<a target="_blank"></a>

網絡爬蟲尋找并下載下傳一個網站上的所有網頁,也許還會把它們存檔,為它們建立索引。從根 url 開始,它擷取每個網頁,解析出沒有遇到過的連結加到隊列中。當網頁沒有未見到過的連結并且隊列為空時,它便停止運作。

我們可以通過同時下載下傳大量的網頁來加快這一過程。當爬蟲發現新的連結,它使用一個新的套接字并行的處理這個新連結,解析響應,添加新連結到隊列。當并發很大時,可能會導緻性能下降,是以我們會限制并發的數量,在隊列保留那些未處理的連結,直到一些正在執行的任務完成。

怎麼使一個爬蟲并發?傳統的做法是建立一個線程池,每個線程使用一個套接字在一段時間内負責一個網頁的下載下傳。比如,下載下傳 xkcd.com 網站的一個網頁:

<code>def fetch(url):</code>

<code>sock = socket.socket()</code>

<code>sock.connect(('xkcd.com', 80))</code>

<code>request = 'get {} http/1.0\r\nhost: xkcd.com\r\n\r\n'.format(url)</code>

<code>sock.send(request.encode('ascii'))</code>

<code>response = b''</code>

<code>chunk = sock.recv(4096)</code>

<code>while chunk:</code>

<code>response += chunk</code>

<code></code>

<code># page is now downloaded.</code>

<code>links = parse_links(response)</code>

<code>q.add(links)</code>

套接字操作預設是阻塞的:當一個線程調用一個類似 <code>connect</code> 和 <code>recv</code> 方法時,它會阻塞,直到操作完成。(即使是 <code>send</code> 也能被阻塞,比如接收端在接受外發消息時緩慢而系統的外發資料緩存已經滿了的情況下)是以,為了同一時間内下載下傳多個網頁,我們需要很多線程。一個複雜的應用會通過線程池保持空閑的線程來分攤建立線程的開銷。同樣的做法也适用于套接字,使用連接配接池。

到目前為止,使用線程的是成本昂貴的,作業系統對一個程序、一個使用者、一台機器能使用線程做了不同的硬性限制。在 作者 jesse 的系統中,一個 python 線程需要 50k 的記憶體,開啟上萬個線程就會失敗。每個線程的開銷和系統的限制就是這種方式的瓶頸所在。

網絡伺服器到了要同時處理成千上萬的客戶的時代了,你不這樣認為麼?畢竟,現在網絡規模很大了。

kegel 在 1999 年創造出“c10k”這個術語。一萬個連接配接在今天看來還是可接受的,但是問題依然存在,隻不過大小不同。回到那時候,對于 c10k 問題,每個連接配接啟一個線程是不切實際的。現在這個限制已經成指數級增長。确實,我們的玩具網絡爬蟲使用線程也可以工作的很好。但是,對于有着千萬級連接配接的大規模應用來說,限制依然存在:它會消耗掉所有線程,即使套接字還夠用。那麼我們該如何解決這個問題?

異步 i/o 架構在一個線程中完成并發操作。讓我們看看這是怎麼做到的。

異步架構使用非阻塞套接字。異步爬蟲中,我們在發起到伺服器的連接配接前把套接字設為非阻塞:

<code>sock.setblocking(false)</code>

<code>try:</code>

<code>except blockingioerror:</code>

<code>pass</code>

對一個非阻塞套接字調用 <code>connect</code> 方法會立即抛出異常,即使它可以正常工作。這個異常複現了底層 c 語言函數令人厭煩的行為,它把 <code>errno</code> 設定為 <code>einprogress</code>,告訴你操作已經開始。

現在我們的爬蟲需要一種知道連接配接何時建立的方法,這樣它才能發送 http 請求。我們可以簡單地使用循環來重試:

<code>encoded = request.encode('ascii')</code>

<code>while true:</code>

<code>sock.send(encoded)</code>

<code>break # done.</code>

<code>except oserror as e:</code>

<code>print('sent')</code>

這種方法不僅消耗 cpu,也不能有效的等待多個套接字。在遠古時代,bsd unix 的解決方法是 <code>select</code>,這是一個 c 函數,它在一個或一組非阻塞套接字上等待事件發生。現在,網際網路應用大量連接配接的需求,導緻<code>select</code> 被 <code>poll</code> 所代替,在 bsd 上的實作是 <code>kqueue</code> ,在 linux 上是 <code>epoll</code>。它們的 api 和 <code>select</code>相似,但在大數量的連接配接中也能有較好的性能。

python 3.4 的 <code>defaultselector</code> 會使用你系統上最好的 <code>select</code> 類函數。要注冊一個網絡 i/o 事件的提醒,我們會建立一個非阻塞套接字,并使用預設 selector 注冊它。

<code>from selectors import defaultselector, event_write</code>

<code>selector = defaultselector()</code>

<code>def connected():</code>

<code>selector.unregister(sock.fileno())</code>

<code>print('connected!')</code>

<code>selector.register(sock.fileno(), event_write, connected)</code>

我們不理會這個僞造的錯誤,調用 <code>selector.register</code>,傳遞套接字檔案描述符和一個表示我們想要監聽什麼事件的常量表達式。為了當連接配接建立時收到提醒,我們使用 <code>event_write</code> :它表示什麼時候這個套接字可寫。我們還傳遞了一個 python 函數 <code>connected</code>,當對應事件發生時被調用。這樣的函數被稱為回調。

在一個循環中,selector 接收到 i/o 提醒時我們處理它們。

<code>def loop():</code>

<code>events = selector.select()</code>

<code>for event_key, event_mask in events:</code>

<code>callback = event_key.data</code>

<code>callback()</code>

<code>connected</code> 回調函數被儲存在 <code>event_key.data</code> 中,一旦這個非阻塞套接字建立連接配接,它就會被取出來執行。

不像我們前面那個快速輪轉的循環,這裡的 <code>select</code> 調用會暫停,等待下一個 i/o 事件,接着執行等待這些事件的回調函數。沒有完成的操作會保持挂起,直到進到下一個事件循環時執行。

到目前為止我們展現了什麼?我們展示了如何開始一個 i/o 操作和當操作準備好時調用回調函數。異步架構,它在單線程中執行并發操作,其建立在兩個功能之上,非阻塞套接字和事件循環。

我們這裡達成了“并發性concurrency”,但不是傳統意義上的“并行性parallelism”。也就是說,我們建構了一個可以進行重疊 i/o 的微小系統,它可以在其它操作還在進行的時候就開始一個新的操作。它實際上并沒有利用多核來并行執行計算。這個系統是用于解決i/o 密集i/o-bound問題的,而不是解決 cpu 密集cpu-bound問題的。(python 的全局解釋器鎖禁止在一個程序中以任何方式并行執行 python 代碼。在 python 中并行化 cpu 密集的算法需要多個程序,或者以将該代碼移植為 c 語言并行版本。但是這是另外一個話題了。)

用我們剛剛建立的異步架構,怎麼才能完成一個網絡爬蟲?即使是一個簡單的網頁下載下傳程式也是很難寫的。

首先,我們有一個尚未擷取的 url 集合,和一個已經解析過的 url 集合。

<code>urls_todo = set(['/'])</code>

<code>seen_urls = set(['/'])</code>

<code>seen_urls</code> 集合包括 <code>urls_todo</code> 和已經完成的 url。用根 url <code>/</code> 初始化它們。

擷取一個網頁需要一系列的回調。在套接字連接配接建立時會觸發 <code>connected</code> 回調,它向伺服器發送一個 get 請求。但是它要等待響應,是以我們需要注冊另一個回調函數;當該回調被調用,它仍然不能讀取到完整的請求時,就會再一次注冊回調,如此反複。

讓我們把這些回調放在一個 <code>fetcher</code> 對象中,它需要一個 url,一個套接字,還需要一個地方儲存傳回的位元組:

<code>class fetcher:</code>

<code>def __init__(self, url):</code>

<code>self.response = b'' # empty array of bytes.</code>

<code>self.url = url</code>

<code>self.sock = none</code>

我們的入口點在 <code>fetcher.fetch</code>:

<code># method on fetcher class.</code>

<code>def fetch(self):</code>

<code>self.sock = socket.socket()</code>

<code>self.sock.setblocking(false)</code>

<code>self.sock.connect(('xkcd.com', 80))</code>

<code># register next callback.</code>

<code>selector.register(self.sock.fileno(),</code>

<code>event_write,</code>

<code>self.connected)</code>

<code>fetch</code> 方法從連接配接一個套接字開始。但是要注意這個方法在連接配接建立前就傳回了。它必須将控制傳回到事件循環中等待連接配接建立。為了了解為什麼要這樣做,假設我們程式的整體結構如下:

<code># begin fetching http://xkcd.com/353/</code>

<code>fetcher = fetcher('/353/')</code>

<code>fetcher.fetch()</code>

<code>callback(event_key, event_mask)</code>

當調用 <code>select</code> 函數後,所有的事件提醒才會在事件循環中處理,是以 <code>fetch</code> 必須把控制權交給事件循環,這樣我們的程式才能知道什麼時候連接配接已建立,接着循環調用 <code>connected</code> 回調,它已經在上面的 <code>fetch</code> 方法中注冊過。

這裡是我們的 <code>connected</code> 方法的實作:

<code>def connected(self, key, mask):</code>

<code>selector.unregister(key.fd)</code>

<code>request = 'get {} http/1.0\r\nhost: xkcd.com\r\n\r\n'.format(self.url)</code>

<code>self.sock.send(request.encode('ascii'))</code>

<code># register the next callback.</code>

<code>selector.register(key.fd,</code>

<code>event_read,</code>

<code>self.read_response)</code>

這個方法發送一個 get 請求。一個真正的應用會檢查 <code>send</code> 的傳回值,以防所有的資訊沒能一次發送出去。但是我們的請求很小,應用也不複雜。它隻是簡單的調用 <code>send</code>,然後等待響應。當然,它必須注冊另一個回調并把控制權交給事件循環。接下來也是最後一個回調函數 <code>read_response</code>,它處理伺服器的響應:

<code>def read_response(self, key, mask):</code>

<code>global stopped</code>

<code>chunk = self.sock.recv(4096) # 4k chunk size.</code>

<code>if chunk:</code>

<code>self.response += chunk</code>

<code>else:</code>

<code>selector.unregister(key.fd) # done reading.</code>

<code>links = self.parse_links()</code>

<code># python set-logic:</code>

<code>for link in links.difference(seen_urls):</code>

<code>urls_todo.add(link)</code>

<code>fetcher(link).fetch() # &lt;- new fetcher.</code>

<code>seen_urls.update(links)</code>

<code>urls_todo.remove(self.url)</code>

<code>if not urls_todo:</code>

<code>stopped = true</code>

這個回調在每次 <code>selector</code> 發現套接字可讀時被調用,可讀有兩種情況:套接字接受到資料或它被關閉。

這個回調函數從套接字讀取 4k 資料。如果不到 4k,那麼有多少讀多少。如果比 4k 多,<code>chunk</code> 中隻包 4k 資料并且這個套接字保持可讀,這樣在事件循環的下一個周期,會再次回到這個回調函數。當響應完成時,伺服器關閉這個套接字,<code>chunk</code> 為空。

這裡沒有展示的 <code>parse_links</code> 方法,它傳回一個 url 集合。我們為每個新的 url 啟動一個 fetcher。注意一個使用異步回調方式程式設計的好處:我們不需要為共享資料加鎖,比如我們往 <code>seen_urls</code> 增加新連結時。這是一種非搶占式的多任務,它不會在我們代碼中的任意一個地方被打斷。

我們增加了一個全局變量 <code>stopped</code>,用它來控制這個循環:

<code>stopped = false</code>

<code>while not stopped:</code>

一旦所有的網頁被下載下傳下來,fetcher 停止這個事件循環,程式退出。

這個例子讓異步程式設計的一個問題明顯的暴露出來:意大利面代碼。

我們需要某種方式來表達一系列的計算和 i/o 操作,并且能夠排程多個這樣的系列操作讓它們并發的執行。但是,沒有線程你不能把這一系列操作寫在一個函數中:當函數開始一個 i/o 操作,它明确的把未來所需的狀态儲存下來,然後傳回。你需要考慮如何寫這個狀态儲存的代碼。

讓我們來解釋下這到底是什麼意思。先來看一下線上程中使用通常的阻塞套接字來擷取一個網頁時是多麼簡單。

<code># blocking version.</code>

在一個套接字操作和下一個操作之間這個函數到底記住了什麼狀态?它有一個套接字,一個 url 和一個可增長的<code>response</code>。運作線上程中的函數使用程式設計語言的基本功能來在棧中的局部變量儲存這些臨時狀态。這樣的函數也有一個“continuation”——它會在 i/o 結束後執行這些代碼。運作時環境通過線程的指令指針來記住這個 continuation。你不必考慮怎麼在 i/o 操作後恢複局部變量和這個 continuation。語言本身的特性幫你解決。

但是用一個基于回調的異步架構時,這些語言特性不能提供一點幫助。當等待 i/o 操作時,一個函數必須明确的儲存它的狀态,因為它會在 i/o 操作完成之前傳回并清除棧幀。在我們基于回調的例子中,作為局部變量的替代,我們把 <code>sock</code> 和 <code>response</code> 作為 fetcher 執行個體 <code>self</code> 的屬性來存儲。而作為指令指針的替代,它通過注冊 <code>connected</code> 和 <code>read_response</code> 回調來儲存它的 continuation。随着應用功能的增長,我們需要手動儲存的回調的複雜性也會增加。如此繁複的記賬式工作會讓編碼者感到頭痛。

更糟糕的是,當我們的回調函數抛出異常會發生什麼?假設我們沒有寫好 <code>parse_links</code> 方法,它在解析 html 時抛出異常:

<code>traceback (most recent call last):</code>

<code>file "loop-with-callbacks.py", line 111, in &lt;module&gt;</code>

<code>loop()</code>

<code>file "loop-with-callbacks.py", line 106, in loop</code>

<code>file "loop-with-callbacks.py", line 51, in read_response</code>

<code>file "loop-with-callbacks.py", line 67, in parse_links</code>

<code>raise exception('parse error')</code>

<code>exception: parse error</code>

是以,除了關于多線程和異步哪個更高效的長期争議之外,還有一個關于這兩者之間的争論:誰更容易跪了。如果在同步上出現失誤,線程更容易出現資料競争的問題,而回調因為"堆棧撕裂stack ripping"問題而非常難于調試。

原文釋出時間為:2017-03-04

本文來自雲栖社群合作夥伴“linux中國”