天天看點

一個使用 asyncio 協程的網絡爬蟲(二)

一個使用 asyncio 協程的網絡爬蟲(二)

還記得我們對你許下的承諾麼?我們可以寫出這樣的異步代碼,它既有回調方式的高效,也有多線程代碼的簡潔。這個結合是同過一種稱為協程coroutine的模式來實作的。使用 python3.4 标準庫 asyncio 和一個叫“aiohttp”的包,在協程中擷取一個網頁是非常直接的( <code>@asyncio.coroutine</code> 修飾符并非魔法。事實上,如果它修飾的是一個生成器函數,并且沒有設定 <code>pythonasynciodebug</code> 環境變量的話,這個修飾符基本上沒啥用。它隻是為了架構的其它部分友善,設定了一個屬性 <code>_is_coroutine</code> 而已。也可以直接使用 asyncio 和裸生成器,而沒有<code>@asyncio.coroutine</code> 修飾符):

<code>@asyncio.coroutine</code>

<code>def fetch(self, url):</code>

<code>response = yield from self.session.get(url)</code>

<code>body = yield from response.read()</code>

它也是可擴充的。在作者 jesse 的系統上,與每個線程 50k 記憶體相比,一個 python 協程隻需要 3k 記憶體。python 很容易就可以啟動上千個協程。

協程的概念可以追溯到計算機科學的遠古時代,它很簡單,一個可以暫停和恢複的子過程。線程是被作業系統控制的搶占式多任務,而協程的多任務是可合作的,它們自己選擇什麼時候暫停去執行下一個協程。

要解釋 python 3.4 中基于生成器的協程,我們需要深入生成器的方方面面,以及它們是如何在 asyncio 中用作協程的。我很高興就此寫點東西,想必你也希望繼續讀下去。我們解釋了基于生成器的協程之後,就會在我們的異步網絡爬蟲中使用它們。

在你了解生成器之前,你需要知道普通的 python 函數是怎麼工作的。正常情況下,當一個函數調用一個子過程,這個被調用函數獲得控制權,直到它傳回或者有異常發生,才把控制權交給調用者:

<code>&gt;&gt;&gt; def foo():</code>

<code>... bar()</code>

<code>...</code>

<code>&gt;&gt;&gt; def bar():</code>

<code>... pass</code>

标準的 python 解釋器是用 c 語言寫的。一個 python 函數被調用所對應的 c 函數是 <code>pyeval_evalframeex</code>。它獲得一個 python 棧幀結構并在這個棧幀的上下文中執行 python 位元組碼。這裡是 <code>foo</code> 函數的位元組碼:

<code>&gt;&gt;&gt; import dis</code>

<code>&gt;&gt;&gt; dis.dis(foo)</code>

<code>2 0 load_global 0 (bar)</code>

<code>3 call_function 0 (0 positional, 0 keyword pair)</code>

<code>6 pop_top</code>

<code>7 load_const 0 (none)</code>

<code>10 return_value</code>

<code>foo</code> 函數在它棧中加載 <code>bar</code> 函數并調用它,然後把 <code>bar</code> 的傳回值從棧中彈出,加載 <code>none</code> 值到堆棧并傳回。

當 <code>pyeval_evalframeex</code> 遇到 <code>call_function</code> 位元組碼時,它會建立一個新的棧幀,并用這個棧幀遞歸的調用<code>pyeval_evalframeex</code> 來執行 <code>bar</code> 函數。

非常重要的一點是,python 的棧幀在堆中配置設定!python 解釋器是一個标準的 c 程式,是以它的棧幀是正常的棧幀。但是 python 的棧幀是在堆中處理。這意味着 python 棧幀在函數調用結束後依然可以存在。我們在<code>bar</code> 函數中儲存目前的棧幀,互動式的看看這種現象:

<code>&gt;&gt;&gt; import inspect</code>

<code>&gt;&gt;&gt; frame = none</code>

<code>... global frame</code>

<code>... frame = inspect.currentframe()</code>

<code>&gt;&gt;&gt; foo()</code>

<code>&gt;&gt;&gt; # the frame was executing the code for 'bar'.</code>

<code>&gt;&gt;&gt; frame.f_code.co_name</code>

<code>'bar'</code>

<code>&gt;&gt;&gt; # its back pointer refers to the frame for 'foo'.</code>

<code>&gt;&gt;&gt; caller_frame = frame.f_back</code>

<code>&gt;&gt;&gt; caller_frame.f_code.co_name</code>

<code>'foo'</code>

一個使用 asyncio 協程的網絡爬蟲(二)

figure 5.1 - function calls

現在該說 python 生成器了,它使用同樣構件——代碼對象和棧幀——去完成一個不可思議的任務。

這是一個生成器函數:

<code>&gt;&gt;&gt; def gen_fn():</code>

<code>... result = yield 1</code>

<code>... print('result of yield: {}'.format(result))</code>

<code>... result2 = yield 2</code>

<code>... print('result of 2nd yield: {}'.format(result2))</code>

<code>... return 'done'</code>

在 python 把 <code>gen_fn</code> 編譯成位元組碼的過程中,一旦它看到 <code>yield</code> 語句就知道這是一個生成器函數而不是普通的函數。它就會設定一個标志來記住這個事實:

<code>&gt;&gt;&gt; # the generator flag is bit position 5.</code>

<code>&gt;&gt;&gt; generator_bit = 1 &lt;&lt; 5</code>

<code>&gt;&gt;&gt; bool(gen_fn.__code__.co_flags &amp; generator_bit)</code>

<code>true</code>

當你調用一個生成器函數,python 看到這個标志,就不會實際運作它而是建立一個生成器:

<code>&gt;&gt;&gt; gen = gen_fn()</code>

<code>&gt;&gt;&gt; type(gen)</code>

<code>&lt;class 'generator'&gt;</code>

python 生成器封裝了一個棧幀和函數體代碼的引用:

<code>&gt;&gt;&gt; gen.gi_code.co_name</code>

<code>'gen_fn'</code>

所有通過調用 <code>gen_fn</code> 的生成器指向同一段代碼,但都有各自的棧幀。這些棧幀不再任何一個c函數棧中,而是在堆空間中等待被使用:

一個使用 asyncio 協程的網絡爬蟲(二)

figure 5.2 - generators

棧幀中有一個指向“最後執行指令”的指針。初始化為 -1,意味着它沒開始運作:

<code>&gt;&gt;&gt; gen.gi_frame.f_lasti</code>

<code>-1</code>

當我們調用 <code>send</code> 時,生成器一直運作到第一個 <code>yield</code> 語句處停止,并且 <code>send</code> 傳回 1,因為這是 <code>gen</code> 傳遞給 <code>yield</code> 表達式的值。

<code>&gt;&gt;&gt; gen.send(none)</code>

<code>1</code>

現在,生成器的指令指針是 3,所編譯的python 位元組碼一共有 56 個位元組:

<code>3</code>

<code>&gt;&gt;&gt; len(gen.gi_code.co_code)</code>

<code>56</code>

這個生成器可以在任何時候、任何函數中恢複運作,因為它的棧幀并不在真正的棧中,而是堆中。在調用鍊中它的位置也是不固定的,它不必遵循普通函數先進後出的順序。它像雲一樣自由。

我們可以傳遞一個值 <code>hello</code> 給生成器,它會成為 <code>yield</code> 語句的結果,并且生成器會繼續運作到第二個<code>yield</code> 語句處。

<code>&gt;&gt;&gt; gen.send('hello')</code>

<code>result of yield: hello</code>

<code>2</code>

現在棧幀中包含局部變量 <code>result</code>:

<code>&gt;&gt;&gt; gen.gi_frame.f_locals</code>

<code>{'result': 'hello'}</code>

其它從 <code>gen_fn</code> 建立的生成器有着它自己的棧幀和局部變量。

當我們再一次調用 <code>send</code>,生成器繼續從第二個 <code>yield</code> 開始運作,以抛出一個特殊的 <code>stopiteration</code> 異常為結束。

<code>&gt;&gt;&gt; gen.send('goodbye')</code>

<code>result of 2nd yield: goodbye</code>

<code>traceback (most recent call last):</code>

<code>file "&lt;input&gt;", line 1, in &lt;module&gt;</code>

<code>stopiteration: done</code>

這個異常有一個值 <code>"done"</code>,它就是生成器的傳回值。

是以生成器可以暫停,可以給它一個值讓它恢複,并且它還有一個傳回值。這些特性看起來很适合去建立一個不使用那種亂糟糟的意面似的回調異步程式設計模型。我們想創造一個這樣的“協程”:一個在程式中可以和其他過程合作排程的過程。我們的協程将會是标準庫 <code>asyncio</code> 中協程的一個簡化版本,我們将使用生成器,futures 和<code>yield from</code> 語句。

首先,我們需要一種方法去代表協程所需要等待的 future 事件。一個簡化的版本是:

<code>class future:</code>

<code>def __init__(self):</code>

<code>self.result = none</code>

<code>self._callbacks = []</code>

<code></code>

<code>def add_done_callback(self, fn):</code>

<code>self._callbacks.append(fn)</code>

<code>def set_result(self, result):</code>

<code>self.result = result</code>

<code>for fn in self._callbacks:</code>

<code>fn(self)</code>

一個 future 初始化為“未解決的”,它通過調用 <code>set_result</code> 來“解決”。(這個 future 缺少很多東西,比如說,當這個 future 解決後,生成yield的協程應該馬上恢複而不是暫停,但是在我們的代碼中卻不沒有這樣做。參見 asyncio 的 future 類以了解其完整實作。)

讓我們用 future 和協程來改寫我們的 fetcher。我們之前用回調寫的 <code>fetch</code> 如下:

<code>class fetcher:</code>

<code>def fetch(self):</code>

<code>self.sock = socket.socket()</code>

<code>self.sock.setblocking(false)</code>

<code>try:</code>

<code>self.sock.connect(('xkcd.com', 80))</code>

<code>except blockingioerror:</code>

<code>pass</code>

<code>selector.register(self.sock.fileno(),</code>

<code>event_write,</code>

<code>self.connected)</code>

<code>def connected(self, key, mask):</code>

<code>print('connected!')</code>

<code># and so on....</code>

<code>fetch</code> 方法開始連接配接一個套接字,然後注冊 <code>connected</code> 回調函數,它會在套接字建立連接配接後調用。現在我們使用協程把這兩步合并:

<code>sock = socket.socket()</code>

<code>sock.setblocking(false)</code>

<code>sock.connect(('xkcd.com', 80))</code>

<code>f = future()</code>

<code>def on_connected():</code>

<code>f.set_result(none)</code>

<code>selector.register(sock.fileno(),</code>

<code>on_connected)</code>

<code>yield f</code>

<code>selector.unregister(sock.fileno())</code>

現在,<code>fetch</code> 是一個生成器,因為它有一個 <code>yield</code> 語句。我們建立一個未決的 future,然後 yield 它,暫停<code>fetch</code> 直到套接字連接配接建立。内聯函數 <code>on_connected</code> 解決這個 future。

但是當 future 被解決,誰來恢複這個生成器?我們需要一個協程驅動器。讓我們叫它 “task”:

<code>class task:</code>

<code>def __init__(self, coro):</code>

<code>self.coro = coro</code>

<code>self.step(f)</code>

<code>def step(self, future):</code>

<code>next_future = self.coro.send(future.result)</code>

<code>except stopiteration:</code>

<code>return</code>

<code>next_future.add_done_callback(self.step)</code>

<code># begin fetching http://xkcd.com/353/</code>

<code>fetcher = fetcher('/353/')</code>

<code>task(fetcher.fetch())</code>

<code>loop()</code>

task 通過傳遞一個 none 值給 <code>fetch</code> 來啟動它。<code>fetch</code> 運作到它 yeild 出一個 future,這個 future 被作為<code>next_future</code> 而捕獲。當套接字連接配接建立,事件循環運作回調函數 <code>on_connected</code>,這裡 future 被解決,<code>step</code> 被調用,<code>fetch</code> 恢複運作。

一旦套接字連接配接建立,我們就可以發送 http get 請求,然後讀取伺服器響應。不再需要哪些分散在各處的回調函數,我們把它們放在同一個生成器函數中:

<code># ... connection logic from above, then:</code>

<code>sock.send(request.encode('ascii'))</code>

<code>while true:</code>

<code>def on_readable():</code>

<code>f.set_result(sock.recv(4096))</code>

<code>event_read,</code>

<code>on_readable)</code>

<code>chunk = yield f</code>

<code>if chunk:</code>

<code>self.response += chunk</code>

<code>else:</code>

<code># done reading.</code>

<code>break</code>

從套接字中讀取所有資訊的代碼看起來很通用。我們能不把它從 <code>fetch</code> 中提取成一個子過程?現在該 python 3 熱捧的 <code>yield from</code> 登場了。它能讓一個生成器委派另一個生成器。

讓我們先回到原來那個簡單的生成器例子:

為了從其他生成器調用這個生成器,我們使用 <code>yield from</code> 委派它:

<code>&gt;&gt;&gt; # generator function:</code>

<code>&gt;&gt;&gt; def caller_fn():</code>

<code>... gen = gen_fn()</code>

<code>... rv = yield from gen</code>

<code>... print('return value of yield-from: {}'</code>

<code>... .format(rv))</code>

<code>&gt;&gt;&gt; # make a generator from the</code>

<code>&gt;&gt;&gt; # generator function.</code>

<code>&gt;&gt;&gt; caller = caller_fn()</code>

這個 <code>caller</code> 生成器的行為的和它委派的生成器 <code>gen</code> 表現的完全一緻:

<code>&gt;&gt;&gt; caller.send(none)</code>

<code>&gt;&gt;&gt; caller.gi_frame.f_lasti</code>

<code>15</code>

<code>&gt;&gt;&gt; caller.send('hello')</code>

<code>&gt;&gt;&gt; caller.gi_frame.f_lasti # hasn't advanced.</code>

<code>&gt;&gt;&gt; caller.send('goodbye')</code>

<code>return value of yield-from: done</code>

<code>stopiteration</code>

當 <code>caller</code> 自 <code>gen</code> 生成(<code>yield</code>),<code>caller</code> 就不再前進。注意到 <code>caller</code> 的指令指針保持15不變,就是<code>yield from</code> 的地方,即使内部的生成器 <code>gen</code> 從一個 yield 語句運作到下一個 yield,它始終不變。(事實上,這就是“yield from”在 cpython 中工作的具體方式。函數會在執行每個語句之前提升其指令指針。但是在外部生成器執行“yield from”後,它會将其指令指針減一,以保持其固定在“yield form”語句上。然後其生成其 caller。這個循環不斷重複,直到内部生成器抛出 stopiteration,這裡指向外部生成器最終允許它自己進行到下一條指令的地方。)從 <code>caller</code> 外部來看,我們無法分辨 yield 出的值是來自 <code>caller</code> 還是它委派的生成器。而從 <code>gen</code> 内部來看,我們也不能分辨傳給它的值是來自 <code>caller</code> 還是 <code>caller</code> 的外面。<code>yield from</code>語句是一個光滑的管道,值通過它進出 <code>gen</code>,一直到 <code>gen</code> 結束。

協程可以用 <code>yield from</code> 把工作委派給子協程,并接收子協程的傳回值。注意到上面的 <code>caller</code> 列印出“return value of yield-from: done”。當 <code>gen</code> 完成後,它的傳回值成為 <code>caller</code> 中 <code>yield from</code> 語句的值。

<code>rv = yield from gen</code>

前面我們批評過基于回調的異步程式設計模式,其中最大的不滿是關于 “堆棧撕裂stack ripping”:當一個回調抛出異常,它的堆棧回溯通常是毫無用處的。它隻顯示出事件循環運作了它,而沒有說為什麼。那麼協程怎麼樣?

<code>... raise exception('my error')</code>

<code>file "&lt;input&gt;", line 3, in caller_fn</code>

<code>file "&lt;input&gt;", line 2, in gen_fn</code>

<code>exception: my error</code>

這還是非常有用的,當異常抛出時,堆棧回溯顯示出 <code>caller_fn</code> 委派了 <code>gen_fn</code>。令人更欣慰的是,你可以在一次異常處理器中封裝這個調用到一個子過程中,像正常函數一樣:

<code>... yield 1</code>

<code>... raise exception('uh oh')</code>

<code>... try:</code>

<code>... yield from gen_fn()</code>

<code>... except exception as exc:</code>

<code>... print('caught {}'.format(exc))</code>

<code>caught uh oh</code>

是以我們可以像提取子過程一樣提取子協程。讓我們從 fetcher 中提取一些有用的子協程。我們先寫一個可以讀一塊資料的協程 <code>read</code>:

<code>def read(sock):</code>

<code>selector.register(sock.fileno(), event_read, on_readable)</code>

<code>chunk = yield f # read one chunk.</code>

<code>return chunk</code>

在 <code>read</code> 的基礎上,<code>read_all</code> 協程讀取整個資訊:

<code>def read_all(sock):</code>

<code>response = []</code>

<code># read whole response.</code>

<code>chunk = yield from read(sock)</code>

<code>while chunk:</code>

<code>response.append(chunk)</code>

<code>return b''.join(response)</code>

如果你換個角度看,抛開 <code>yield form</code> 語句的話,它們就像在做阻塞 i/o 的普通函數一樣。但是事實上,<code>read</code> 和 <code>read_all</code> 都是協程。<code>yield from</code> <code>read</code> 暫停 <code>read_all</code> 直到 i/o 操作完成。當<code>read_all</code> 暫停時,asyncio 的事件循環正在做其它的工作并等待其他的 i/o 操作。<code>read</code> 在下次循環中當事件就緒,完成 i/o 操作時,<code>read_all</code> 恢複運作。

最終,<code>fetch</code> 調用了 <code>read_all</code>:

<code>self.response = yield from read_all(sock)</code>

神奇的是,task 類不需要做任何改變,它像以前一樣驅動外部的 <code>fetch</code> 協程:

當 <code>read</code> yield 一個 future 時,task 從 <code>yield from</code> 管道中接收它,就像這個 future 直接從 <code>fetch</code> yield 一樣。當循環解決一個 future 時,task 把它的結果送給 <code>fetch</code>,通過管道,<code>read</code> 接受到這個值,這完全就像 task 直接驅動 <code>read</code> 一樣:

一個使用 asyncio 協程的網絡爬蟲(二)

figure 5.3 - yield from

為了完善我們的協程實作,我們再做點打磨:當等待一個 future 時,我們的代碼使用 yield;而當委派一個子協程時,使用 yield from。不管是不是協程,我們總是使用 yield form 會更精煉一些。協程并不需要在意它在等待的東西是什麼類型。

在 python 中,我們從生成器和疊代器的高度相似中獲得了好處,将生成器進化成 caller,疊代器也可以同樣獲得好處。是以,我們可以通過特殊的實作方式來疊代我們的 future 類:

<code># method on future class.</code>

<code>def __iter__(self):</code>

<code># tell task to resume me here.</code>

<code>yield self</code>

<code>return self.result</code>

future 的 <code>__iter__</code> 方法是一個 yield 它自身的一個協程。當我們将代碼替換如下時:

<code># f is a future.</code>

以及……:

<code>yield from f</code>

……結果是一樣的!驅動 task 從它的調用 <code>send</code> 中接收 future,并當 future 解決後,它發回新的結果給該協程。

在每個地方都使用 <code>yield from</code> 的好處是什麼?為什麼比用 <code>field</code> 等待 future 并用 <code>yield from</code> 委派子協程更好?之是以更好的原因是,一個方法可以自由地改變其實行而不影響到其調用者:它可以是一個當 future 解決後傳回一個值的普通方法,也可以是一個包含 <code>yield from</code> 語句并傳回一個值的協程。無論是哪種情況,調用者僅需要 <code>yield from</code> 該方法以等待結果就行。

親愛的讀者,我們已經完成了對 asyncio 協程探索。我們深入觀察了生成器的機制,實作了簡單的 future 和 task。我們指出協程是如何利用兩個世界的優點:比線程高效、比回調清晰的并發 i/o。當然真正的 asyncio 比我們這個簡化版本要複雜的多。真正的架構需要處理zero-copy i/0、公平排程、異常處理和其他大量特性。

使用 asyncio 編寫協程代碼比你現在看到的要簡單的多。在前面的代碼中,我們從基本原理去實作協程,是以你看到了回調,task 和 future,甚至非阻塞套接字和 <code>select</code> 調用。但是當用 asyncio 編寫應用,這些都不會出現在你的代碼中。我們承諾過,你可以像這樣下載下傳一個網頁:

對我們的探索還滿意麼?回到我們原始的任務:使用 asyncio 寫一個網絡爬蟲。

原文釋出時間為:2017-03-05

本文來自雲栖社群合作夥伴“linux中國”