天天看點

Python 異步爬蟲原了解析及爬取實戰一、基本概念二、協程用法三、異步爬蟲實作

爬蟲是 IO 密集型任務,比如我們使用 requests 庫來爬取某個站點的話,發出一個請求之後,程式必須要等待網站傳回響應之後才能接着運作,而在等待響應的過程中,整個爬蟲程式是一直在等待的,實際上沒有做任何的事情。

一、基本概念

阻塞

阻塞狀态指程式未得到所需計算資源時被挂起的狀态。程式在等待某個操作完成期間,自身無法繼續處理其他的事情,則稱該程式在該操作上是阻塞的。

常見的阻塞形式有:網絡 I/O 阻塞、磁盤 I/O 阻塞、使用者輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,所有的程序都無法真正處理事情,它們也會被阻塞。如果是多核 CPU 則正在執行上下文切換操作的核不可被利用。

非阻塞

程式在等待某操作過程中,自身不被阻塞,可以繼續處理其他的事情,則稱該程式在該操作上是非阻塞的。

非阻塞并不是在任何程式級别、任何情況下都可以存在的。僅當程式封裝的級别可以囊括獨立的子程式單元時,它才可能存在非阻塞狀态。

非阻塞的存在是因為阻塞存在,正因為某個操作阻塞導緻的耗時與效率低下,我們才要把它變成非阻塞的。

同步

不同程式單元為了完成某個任務,在執行過程中需靠某種通信方式以協調一緻,我們稱這些程式單元是同步執行的。

例如購物系統中更新商品庫存,需要用“行鎖”作為通信信号,讓不同的更新請求強制排隊順序執行,那更新庫存的操作是同步的。

簡言之,同步意味着有序。

異步

為完成某個任務,不同程式單元之間過程中無需通信協調,也能完成任務的方式,不相關的程式單元之間可以是異步的。

例如,爬蟲下載下傳網頁。排程程式調用下載下傳程式後,即可排程其他任務,而無需與該下載下傳任務保持通信以協調行為。不同網頁的下載下傳、儲存等操作都是無關的,也無需互相通知協調。這些異步操作的完成時刻并不确定。

簡言之,異步意味着無序。

多程序

多程序就是利用 CPU 的多核優勢,在同一時間并行地執行多個任務,可以大大提高執行效率。

協程

協程,英文叫作 Coroutine,又稱微線程、纖程,協程是一種使用者态的輕量級線程。

協程擁有自己的寄存器上下文和棧。協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧。是以協程能保留上一次調用時的狀态,即所有局部狀态的一個特定組合,每次過程重入時,就相當于進入上一次調用的狀态。協程本質上是個單程序,協程相對于多程序來說,無需線程上下文切換的開銷,無需原子操作鎖定及同步的開銷,程式設計模型也非常簡單。我們可以使用協程來實作異步操作,比如在網絡爬蟲場景下,我們發出一個請求之後,需要等待一定的時間才能得到響應,但其實在這個等待過程中,程式可以幹許多其他的事情,等到響應得到之後才切換回來繼續處理,這樣可以充分利用 CPU 和其他資源,這就是協程的優勢。

二、協程用法

從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程還是以生成器對象為基礎的,在 Python 3.5 則增加了 async/await,使得協程的實作更加友善。

Python 中使用協程最常用的庫莫過于 asyncio

  • event_loop:事件循環,相當于一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足條件發生的時候,就會調用對應的處理方法。
  • coroutine:中文翻譯叫協程,在 Python 中常指代為協程對象類型,我們可以将協程對象注冊到時間循環中,它會被事件循環調用。我們可以使用 async 關鍵字來定義一個方法,這個方法在調用時不會立即被執行,而是傳回一個協程對象。
  • task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀态。
  • future:代表将來執行或沒有執行的任務的結果,實際上和 task 沒有本質差別。

async/await 關鍵字,是從 Python 3.5 才出現的,專門用于定義協程。其中,async 定義一個協程,await 用來挂起阻塞方法的執行。

定義協程

定義一個協程,感受它和普通程序在實作上的不同之處,代碼如下:

import asyncio

async def execute(x):
    print('Number:', x)

coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
           

運作結果如下:

Coroutine: <coroutine object execute at 0x0000027808F5BE48>
After calling execute
Number: 666
After calling loop

Process finished with exit code 0
           

首先導入 asyncio 這個包,這樣才可以使用 async 和 await,然後使用 async 定義了一個 execute 方法,方法接收一個數字參數,方法執行之後會列印這個數字。

随後我們直接調用了這個方法,然而這個方法并沒有執行,而是傳回了一個 coroutine 協程對象。随後我們使用 get_event_loop 方法建立了一個事件循環 loop,并調用了 loop 對象的 run_until_complete 方法将協程注冊到事件循環 loop 中,然後啟動。最後我們才看到了 execute 方法列印了輸出結果。

可見,async 定義的方法就會變成一個無法直接執行的 coroutine 對象,必須将其注冊到事件循環中才可以執行。

前面還提到了 task,它是對 coroutine 對象的進一步封裝,它裡面相比 coroutine 對象多了運作狀态,比如 running、finished 等,我們可以用這些狀态來擷取協程對象的執行情況。在上面的例子中,當我們将 coroutine 對象傳遞給 run_until_complete 方法的時候,實際上它進行了一個操作就是将 coroutine 封裝成了 task 對象。task也可以顯式地進行聲明,如下所示:

import asyncio

async def execute(x):
    print('Number:', x)
    return x
    
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
           

運作結果如下:

Coroutine: <coroutine object execute at 0x000001CB3F90BE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop

Process finished with exit code 0
           

這裡我們定義了 loop 對象之後,接着調用了它的 create_task 方法将 coroutine 對象轉化為了 task 對象,随後我們列印輸出一下,發現它是 pending 狀态。接着我們将 task 對象添加到事件循環中得到執行,随後我們再列印輸出一下 task 對象,發現它的狀态就變成了 finished,同時還可以看到其 result 變成了 666,也就是我們定義的 execute 方法的傳回結果。

定義 task 對象還有一種常用方式,就是直接通過 asyncio 的 ensure_future 方法,傳回結果也是 task 對象,這樣的話我們就可以不借助于 loop 來定義,即使還沒有聲明 loop 也可以提前定義好 task 對象,寫法如下:

import asyncio

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
           

運作效果如下:

Coroutine: <coroutine object execute at 0x0000019794EBBE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop

Process finished with exit code 0
           

發現其運作效果都是一樣的

task對象的綁定回調操作

可以為某個 task 綁定一個回調方法,舉如下例子:

import asyncio
import requests

async def call_on():
    status = requests.get('https://www.baidu.com')
    return status

def call_back(task):
    print('Status:', task.result())

corountine = call_on()
task = asyncio.ensure_future(corountine)
task.add_done_callback(call_back)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
           

定義了一個call_on 方法,請求了百度,擷取其狀态碼,但是這個方法裡面我們沒有任何 print 語句。随後我們定義了一個 call_back 方法,這個方法接收一個參數,是 task 對象,然後調用 print列印了 task 對象的結果。這樣我們就定義好了一個 coroutine 對象和一個回調方法,

希望達到的效果是,當 coroutine 對象執行完畢之後,就去執行聲明的 callback 方法。實作這樣的效果隻需要調用 add_done_callback 方法即可,我們将 callback 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢之後就可以調用 callback 方法了,同時 task 對象還會作為參數傳遞給 callback 方法,調用 task 對象的 result 方法就可以擷取傳回結果了。

運作結果如下:

Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4> cb=[call_back() at D:/python/pycharm2020/program/test_003.py:8]>
Status: <Response [200]>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
           

也可以不用回調方法,直接在 task 運作完畢之後也能直接調用 result 方法擷取結果,如下所示:

import asyncio
import requests

async def call_on():
    status = requests.get('https://www.baidu.com')
    return status

def call_back(task):
    print('Status:', task.result())

corountine = call_on()
task = asyncio.ensure_future(corountine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task:', task.result())
           

運作效果一樣:

Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4>>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
Task: <Response [200]>
           

庭雲最近發現一個 IP 代理平台,在推出活動,可以限時免費領取1萬代理IP~~ ,有需要的讀者朋友可以長按識别下方二維碼,注冊體驗一下下呀。

三、異步爬蟲實作

要實作異步處理,得先要有挂起的操作,當一個任務需要等待 IO 結果的時候,可以挂起目前任務,轉而去執行其他任務,這樣才能充分利用好資源,要實作異步,需要了解一下 await 的用法,使用 await 可以将耗時等待的操作挂起,讓出控制權。當協程執行的時候遇到 await,時間循環就會将本協程挂起,轉而去執行别的協程,直到其他的協程挂起或執行完畢。

await 後面的對象必須是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一個由 types.coroutine 修飾的生成器,這個生成器可以傳回 coroutine 對象。
  • An object with an await method returning an iterator,一個包含 await 方法的對象傳回的一個疊代器。

aiohttp的使用

aiohttp 是一個支援異步請求的庫,利用它和 asyncio 配合我們可以非常友善地實作異步請求操作。下面以通路我部落格裡面的文章,并傳回reponse.text()為例,實作異步爬蟲。

from lxml import etree
import requests
import logging
import time
import aiohttp
import asyncio

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/fyfugoyfa'
start_time = time.time()

# 先擷取部落格裡的文章連結
def get_urls():
    headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
    resp = requests.get(url, headers=headers)
    html = etree.HTML(resp.text)
    url_list = html.xpath('//div[@class="article-list"]/div/h4/a/@href')
    return url_list

async def request_page(url):
    logging.info('scraping %s', url)
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        return await response.text()

def main():
    url_list = get_urls()
    tasks = [asyncio.ensure_future(request_page(url)) for url in url_list]
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)

if __name__ == '__main__':
    main()
    end_time = time.time()
    logging.info('total time %s seconds', end_time - start_time)
           

執行個體中将請求庫由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get 方法進行請求,運作效果如下:

Python 異步爬蟲原了解析及爬取實戰一、基本概念二、協程用法三、異步爬蟲實作

異步操作的便捷之處在于,當遇到阻塞式操作時,任務被挂起,程式接着去執行其他的任務,而不是傻傻地等待,這樣可以充分利用 CPU 時間,而不必把時間浪費在等待 IO 上。

上面的例子與單線程版和多線程版的比較如下:

多線程版

import requests
import logging
import time
from lxml import etree
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/fyfugoyfa'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()

# 先擷取部落格裡的文章連結
def get_urls():
    resp = requests.get(url, headers=headers)
    html = etree.HTML(resp.text)
    url_list = html.xpath('//div[@class="article-list"]/div/h4/a/@href')
    return url_list

def request_page(url):
    logging.info('scraping %s', url)
    resp = requests.get(url, headers=headers)
    return resp.text

def main():
    url_list = get_urls()
    with ThreadPoolExecutor(max_workers=6) as executor:
        executor.map(request_page, url_list)

if __name__ == '__main__':
    main()
    end_time = time.time()
    logging.info('total time %s seconds', end_time - start_time)
           

運作效果如下:

Python 異步爬蟲原了解析及爬取實戰一、基本概念二、協程用法三、異步爬蟲實作

單線程版

import requests
import logging
import time
from lxml import etree

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/fyfugoyfa'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()

# 先擷取部落格裡的文章連結
def get_urls():
    resp = requests.get(url, headers=headers)
    html = etree.HTML(resp.text)
    url_list = html.xpath('//div[@class="article-list"]/div/h4/a/@href')
    return url_list

def request_page(url):
    logging.info('scraping %s', url)
    resp = requests.get(url, headers=headers)
    return resp.text

def main():
    url_list = get_urls()
    for url in url_list:
        request_page(url)

if __name__ == '__main__':
    main()
    end_time = time.time()
    logging.info('total time %s seconds', end_time - start_time)
           

運作效果如下:

Python 異步爬蟲原了解析及爬取實戰一、基本概念二、協程用法三、異步爬蟲實作

經過測試可以發現,如果能将異步請求靈活運用在爬蟲中,在伺服器能承受高并發的前提下增加并發數量,爬取效率提升是非常可觀的。

作者:葉庭雲

公衆号:微信搜一搜【修煉Python】 分享學習文檔、教程資料和履歷模闆

CSDN:https://yetingyun.blog.csdn.net/

本文僅用于交流學習,未經作者允許,禁止轉載,更勿做其他用途,違者必究。

熱愛可抵歲月漫長,發現求知的樂趣,在不斷總結和學習中進步,與諸君共勉。