天天看點

一個使用 asyncio 協程的網絡爬蟲(三)

我們将從描述爬蟲如何工作開始。現在是時候用 asynio 去實作它了。

我們的爬蟲從擷取第一個網頁開始,解析對外連結接并把它們加到隊列中。此後它開始傲遊整個網站,并發地擷取網頁。但是由于用戶端和服務端的負載限制,我們希望有一個最大數目的運作的 worker,不能再多。任何時候一個 worker 完成一個網頁的擷取,它應該立即從隊列中取出下一個連結。我們會遇到沒有那麼多事幹的時候,是以一些 worker 必須能夠暫停。一旦又有 worker 擷取一個有很多連結的網頁,隊列會突增,暫停的 worker 立馬被喚醒幹活。最後,當任務完成後我們的程式必須馬上退出。

<code>try:</code>

<code>from asyncio import joinablequeue as queue</code>

<code>except importerror:</code>

<code># in python 3.5, asyncio.joinablequeue is</code>

<code># merged into queue.</code>

<code>from asyncio import queue</code>

我們把 worker 的共享狀态收集在一個 crawler 類中,主要的邏輯寫在 <code>crawl</code> 方法中。我們在一個協程中啟動<code>crawl</code>,運作 asyncio 的事件循環直到 <code>crawl</code> 完成:

<code>loop = asyncio.get_event_loop()</code>

<code></code>

<code>crawler = crawling.crawler('http://xkcd.com',</code>

<code>max_redirect=10)</code>

<code>loop.run_until_complete(crawler.crawl())</code>

crawler 用一個根 url 和最大重定向數 <code>max_redirect</code> 來初始化,它把 <code>(url, max_redirect)</code> 序對放入隊列中。(為什麼要這樣做,請看下文)

<code>class crawler:</code>

<code>def __init__(self, root_url, max_redirect):</code>

<code>self.max_tasks = 10</code>

<code>self.max_redirect = max_redirect</code>

<code>self.q = queue()</code>

<code>self.seen_urls = set()</code>

<code># aiohttp's clientsession does connection pooling and</code>

<code># http keep-alives for us.</code>

<code>self.session = aiohttp.clientsession(loop=loop)</code>

<code># put (url, max_redirect) in the queue.</code>

<code>self.q.put((root_url, self.max_redirect))</code>

現在隊列中未完成的任務數是 1。回到我們的主程式,啟動事件循環和 <code>crawl</code> 方法:

<code>crawl</code> 協程把 worker 們趕起來幹活。它像一個主線程:阻塞在 <code>join</code> 上直到所有任務完成,同時 worker 們在背景運作。

<code>@asyncio.coroutine</code>

<code>def crawl(self):</code>

<code>"""run the crawler until all work is done."""</code>

<code>workers = [asyncio.task(self.work())</code>

<code>for _ in range(self.max_tasks)]</code>

<code># when all work is done, exit.</code>

<code>yield from self.q.join()</code>

<code>for w in workers:</code>

<code>w.cancel()</code>

如果 worker 是線程,可能我們不會一次把它們全部建立出來。為了避免建立線程的昂貴代價,通常一個線程池會按需增長。但是協程很廉價,我們可以直接把他們全部建立出來。

怎麼關閉這個 <code>crawler</code> 很有趣。當 <code>join</code> 完成,worker 存活但是被暫停:他們等待更多的 url,是以主協程要在退出之前清除它們。否則 python 解釋器關閉并調用所有對象的析構函數時,活着的 worker 會哭喊到:

<code>error:asyncio:task was destroyed but it is pending!</code>

<code>cancel</code> 又是如何工作的呢?生成器還有一個我們還沒介紹的特點。你可以從外部抛一個異常給它:

<code>&gt;&gt;&gt; gen = gen_fn()</code>

<code>&gt;&gt;&gt; gen.send(none) # start the generator as usual.</code>

<code>1</code>

<code>&gt;&gt;&gt; gen.throw(exception('error'))</code>

<code>traceback (most recent call last):</code>

<code>file "&lt;input&gt;", line 3, in &lt;module&gt;</code>

<code>file "&lt;input&gt;", line 2, in gen_fn</code>

<code>exception: error</code>

生成器被 <code>throw</code> 恢複,但是它現在抛出一個異常。如過生成器的調用堆棧中沒有捕獲異常的代碼,這個異常被傳遞到頂層。是以登出一個協程:

<code># method of task class.</code>

<code>def cancel(self):</code>

<code>self.coro.throw(cancellederror)</code>

任何時候生成器暫停,在某些 <code>yield from</code> 語句它恢複并且抛出一個異常。我們在 task 的 <code>step</code> 方法中處理登出。

<code>def step(self, future):</code>

<code>next_future = self.coro.send(future.result)</code>

<code>except cancellederror:</code>

<code>self.cancelled = true</code>

<code>return</code>

<code>except stopiteration:</code>

<code>next_future.add_done_callback(self.step)</code>

現在 task 知道它被登出了,是以當它被銷毀時,它不再抱怨。

一旦 <code>crawl</code> 登出了 worker,它就退出。同時事件循環看見這個協程結束了(我們後面會見到的),也就退出。

<code>crawl</code> 方法包含了所有主協程需要做的事。而 worker 則完成從隊列中擷取 url、擷取網頁、解析它們得到新的連結。每個 worker 獨立地運作 <code>work</code> 協程:

<code>def work(self):</code>

<code>while true:</code>

<code>url, max_redirect = yield from self.q.get()</code>

<code># download page and add new links to self.q.</code>

<code>yield from self.fetch(url, max_redirect)</code>

<code>self.q.task_done()</code>

python 看見這段代碼包含 <code>yield from</code> 語句,就把它編譯成生成器函數。是以在 <code>crawl</code> 方法中,我們調用了 10 次 <code>self.work</code>,但并沒有真正執行,它僅僅建立了 10 個指向這段代碼的生成器對象并把它們包裝成 task 對象。task 接收每個生成器所 yield 的 future,通過調用 <code>send</code> 方法,當 future 解決時,用 future 的結果做為 <code>send</code> 的參數,來驅動它。由于生成器有自己的棧幀,它們可以獨立運作,帶有獨立的局部變量和指令指針。

worker 使用隊列來協調其小夥伴。它這樣等待新的 url:

隊列的 <code>get</code> 方法自身也是一個協程,它一直暫停到有新的 url 進入隊列,然後恢複并傳回該條目。

碰巧,這也是當主協程登出 worker 時,最後 crawl 停止,worker 協程暫停的地方。從協程的角度,<code>yield from</code> 抛出<code>cancellederror</code> 結束了它在循環中的最後旅程。

worker 擷取一個網頁,解析連結,把新的連結放入隊列中,接着調用<code>task_done</code>減小計數器。最終一個worker遇到一個沒有新連結的網頁,并且隊列裡也沒有任務,這次<code>task_done</code>的調用使計數器減為0,而<code>crawl</code>正阻塞在<code>join</code>方法上,現在它就可以結束了。

我們承諾過要解釋為什麼隊列中要使用序對,像這樣:

<code># url to fetch, and the number of redirects left.</code>

<code>('http://xkcd.com/353', 10)</code>

新的 url 的重定向次數是10。擷取一個特别的 url 會重定向一個新的位置。我們減小重定向次數,并把新的 url 放入隊列中。

<code># url with a trailing slash. nine redirects left.</code>

<code>('http://xkcd.com/353/', 9)</code>

我們使用的 <code>aiohttp</code> 預設會跟蹤重定向并傳回最終結果。但是,我們告訴它不要這樣做,爬蟲自己來處理重定向,以便它可以合并那些目的相同的重定向路徑:如果我們已經在 <code>self.seen_urls</code> 看到一個 url,說明它已經從其他的地方走過這條路了。

一個使用 asyncio 協程的網絡爬蟲(三)

figure 5.4 - redirects

crawler 擷取“foo”并發現它重定向到了“baz”,是以它會加“baz”到隊列和 <code>seen_urls</code> 中。如果它擷取的下一個頁面“bar” 也重定向到“baz”,fetcher 不會再次将 “baz”加入到隊列中。如果該響應是一個頁面,而不是一個重定向,<code>fetch</code> 會解析它的連結,并把新連結放到隊列中。

<code>def fetch(self, url, max_redirect):</code>

<code># handle redirects ourselves.</code>

<code>response = yield from self.session.get(</code>

<code>url, allow_redirects=false)</code>

<code>if is_redirect(response):</code>

<code>if max_redirect &gt; 0:</code>

<code>next_url = response.headers['location']</code>

<code>if next_url in self.seen_urls:</code>

<code># we have been down this path before.</code>

<code># remember we have seen this url.</code>

<code>self.seen_urls.add(next_url)</code>

<code># follow the redirect. one less redirect remains.</code>

<code>self.q.put_nowait((next_url, max_redirect - 1))</code>

<code>else:</code>

<code>links = yield from self.parse_links(response)</code>

<code># python set-logic:</code>

<code>for link in links.difference(self.seen_urls):</code>

<code>self.q.put_nowait((link, self.max_redirect))</code>

<code>self.seen_urls.update(links)</code>

<code>finally:</code>

<code># return connection to pool.</code>

<code>yield from response.release()</code>

如果這是多程序代碼,就有可能遇到讨厭的競争條件。比如,一個 worker 檢查一個連結是否在 <code>seen_urls</code>中,如果沒有它就把這個連結加到隊列中并把它放到 <code>seen_urls</code> 中。如果它在這兩步操作之間被中斷,而另一個 worker 解析到相同的連結,發現它并沒有出現在 <code>seen_urls</code> 中就把它加入隊列中。這(至少)導緻同樣的連結在隊列中出現兩次,做了重複的工作和錯誤的統計。

然而,一個協程隻在 <code>yield from</code> 時才會被中斷。這是協程比多線程少遇到競争條件的關鍵。多線程必須獲得鎖來明确的進入一個臨界區,否則它就是可中斷的。而 python 的協程預設是不會被中斷的,隻有它明确 yield 時才主動放棄控制權。

我們不再需要在用回調方式時用的 fetcher 類了。這個類隻是不高效回調的一個變通方法:在等待 i/o 時,它需要一個存儲狀态的地方,因為局部變量并不能在函數調用間保留。倒是 <code>fetch</code> 協程可以像普通函數一樣用局部變量儲存它的狀态,是以我們不再需要一個類。

當 <code>fetch</code> 完成對伺服器響應的處理,它傳回到它的調用者 <code>work</code>。<code>work</code> 方法對隊列調用 <code>task_done</code>,接着從隊列中取出一個要擷取的 url。

當 <code>fetch</code> 把新的連結放入隊列中,它增加未完成的任務計數器,并停留在主協程,主協程在等待 <code>q.join</code>,處于暫停狀态。而當沒有新的連結并且這是隊列中最後一個 url 時,當 <code>work 調用</code>task_done<code>,任務計數器變為 0,主協程從</code>join` 中退出。

與 worker 和主協程一起工作的隊列代碼像這樣(實際的 <code>asyncio.queue</code> 實作在 future 所展示的地方使用<code>asyncio.event</code> 。不同之處在于 event 是可以重置的,而 future 不能從已解決傳回變成待決。)

<code>class queue:</code>

<code>def __init__(self):</code>

<code>self._join_future = future()</code>

<code>self._unfinished_tasks = 0</code>

<code># ... other initialization ...</code>

<code>def put_nowait(self, item):</code>

<code>self._unfinished_tasks += 1</code>

<code># ... store the item ...</code>

<code>def task_done(self):</code>

<code>self._unfinished_tasks -= 1</code>

<code>if self._unfinished_tasks == 0:</code>

<code>self._join_future.set_result(none)</code>

<code>def join(self):</code>

<code>if self._unfinished_tasks &gt; 0:</code>

<code>yield from self._join_future</code>

主協程 <code>crawl</code> yield from <code>join</code>。是以當最後一個 worker 把計數器減為 0,它告訴 <code>crawl</code> 恢複運作并結束。

旅程快要結束了。我們的程式從 <code>crawl</code> 調用開始:

<code>loop.run_until_complete(self.crawler.crawl())</code>

程式如何結束?因為 <code>crawl</code> 是一個生成器函數,調用它傳回一個生成器。為了驅動它,asyncio 把它包裝成一個 task:

<code>class eventloop:</code>

<code>def run_until_complete(self, coro):</code>

<code>"""run until the coroutine is done."""</code>

<code>task = task(coro)</code>

<code>task.add_done_callback(stop_callback)</code>

<code>self.run_forever()</code>

<code>except stoperror:</code>

<code>pass</code>

<code>class stoperror(baseexception):</code>

<code>"""raised to stop the event loop."""</code>

<code>def stop_callback(future):</code>

<code>raise stoperror</code>

當這個任務完成,它抛出 <code>stoperror</code>,事件循環把這個異常當作正常退出的信号。

但是,task 的 <code>add_done_callbock</code> 和 <code>result</code> 方法又是什麼呢?你可能認為 task 就像一個 future,不錯,你的直覺是對的。我們必須承認一個向你隐藏的細節,task 是 future。

<code>class task(future):</code>

<code>"""a coroutine wrapped in a future."""</code>

通常,一個 future 被别人調用 <code>set_result</code> 解決。但是 task,當協程結束時,它自己解決自己。記得我們解釋過當 python 生成器傳回時,它抛出一個特殊的 <code>stopiteration</code> 異常:

<code># method of class task.</code>

<code>except stopiteration as exc:</code>

<code># task resolves itself with coro's return</code>

<code># value.</code>

<code>self.set_result(exc.value)</code>

是以當事件循環調用 <code>task.add_done_callback(stop_callback)</code>,它就準備被這個 task 停止。在看一次<code>run_until_complete</code>:

<code># method of event loop.</code>

當 task 捕獲 <code>stopiteration</code> 并解決自己,這個回調從循環中抛出 <code>stoperror</code>。循環結束,調用棧回到<code>run_until_complete</code>。我們的程式結束。

<a target="_blank"></a>

現代的程式越來越多是 i/o 密集型而不是 cpu 密集型。對于這樣的程式,python 的線程在兩個方面不合适:全局解釋器鎖阻止真正的并行計算,并且搶占切換也導緻他們更容易出現競争。異步通常是正确的選擇。但是随着基于回調的異步代碼增加,它會變得非常混亂。協程是一個更整潔的替代者。它們自然地重構成子過程,有健全的異常處理和棧追溯。

如果我們換個角度看 <code>yield from</code> 語句,一個協程看起來像一個傳統的做阻塞 i/o 的線程。甚至我們可以采用經典的多線程模式程式設計,不需要重新發明。是以,與回調相比,協程更适合有經驗的多線程的編碼者。

這章寫于 python 和異步的複興時期。你剛學到的基于生成器的的協程,在 2014 年釋出在 python 3.4 的 asyncio 子產品中。2015 年 9 月,python 3.5 釋出,協程成為語言的一部分。這個原生的協程通過“async def”來聲明, 使用“await”而不是“yield from”委托一個協程或者等待 future。

除了這些優點,核心的思想不變。python 新的原生協程與生成器隻是在文法上不同,工作原理非常相似。事實上,在 python 解釋器中它們共用同一個實作方法。task、future 和事件循環在 asynico 中扮演着同樣的角色。

你已經知道 asyncio 協程是如何工作的了,現在你可以忘記大部分的細節。這些機制隐藏在一個整潔的接口下。但是你對這基本原理的了解能讓你在現代異步環境下正确而高效的編寫代碼。

原文釋出時間為:2017-03-06

本文來自雲栖社群合作夥伴“linux中國”