天天看點

Scrapy源碼閱讀分析_4_請求處理流程

運作入口

還是回到最初的入口,在Scrapy源碼分析(二)運作入口這篇文章中已經講解到,在執行scrapy指令時,調用流程如下:

  • 調用cmdline.py的execute方法
  • 調用指令執行個體解析指令行
  • 建構CrawlerProcess執行個體,調用crawl和start方法

而 crawl 方法最終是調用了 Cralwer 執行個體的 crawl,這個方法最終把控制權交由 Engine,而 start 方法 注冊好協程池,開始異步排程。

我們來看 Cralwer 的 crawl 方法:(scrapy/crawler.py)

Scrapy源碼閱讀分析_4_請求處理流程

在把控制權交給引擎排程之前,先建立出爬蟲執行個體,然後建立引擎執行個體(此過程見​​Scrapy源碼分析(三)核心元件初始化​​​),然後調用了​

​spider​

​​的​

​start_requests​

​​方法,這個方法就是我們平時寫的最多爬蟲類的父類,它在​

​spiders/__init__.py​

​中:

Scrapy源碼閱讀分析_4_請求處理流程

建構請求

在這裡我們能看到,平時我們必須要定義的​

​start_urls​

​​,原來是在這裡拿來建構​

​Request​

​​的,來看​

​Request​

​的是如何建構的:

(scrapy/http/request/__init__.py)

"""
This module implements the Request class which is used to represent HTTP
requests in Scrapy.

See documentation in docs/topics/request-response.rst
"""
import six
from w3lib.url import safe_url_string

from scrapy.http.headers import Headers
from scrapy.utils.python import to_bytes
from scrapy.utils.trackref import object_ref
from scrapy.utils.url import escape_ajax
from scrapy.http.common import obsolete_setter


class Request(object_ref):

    def __init__(self, url, callback=None, method='GET', headers=None, body=None,
                 cookies=None, meta=None, encoding='utf-8', priority=0,
                 dont_filter=False, errback=None, flags=None):

        # 編碼
        self._encoding = encoding  # this one has to be set first
        # 請求方法
        self.method = str(method).upper()
        # 設定 URL
        self._set_url(url)
        #  設定 body
        self._set_body(body)
        assert isinstance(priority, int), "Request priority not an integer: %r" % priority
        # 優先級
        self.priority = priority
        if callback is not None and not callable(callback):
            raise TypeError('callback must be a callable, got %s' % type(callback).__name__)
        if errback is not None and not callable(errback):
            raise TypeError('errback must be a callable, got %s' % type(errback).__name__)
        assert callback or not errback, "Cannot use errback without a callback"
        # 回調函數
        self.callback = callback
        # 異常回調函數
        self.errback = errback
        # cookies
        self.cookies = cookies or {}
        # 建構Header
        self.headers = Headers(headers or {}, encoding=encoding)
        # 是否需要過濾
        self.dont_filter = dont_filter
        # 附加資訊
        self._meta = dict(meta) if meta else None
        self.flags = [] if flags is None else list(flags)

    @property
    def meta(self):
        if self._meta is None:
            self._meta = {}
        return self._meta

    def _get_url(self):
        return self._url

    def _set_url(self, url):
        if not isinstance(url, six.string_types):
            raise TypeError('Request url must be str or unicode, got %s:' % type(url).__name__)

        s = safe_url_string(url, self.encoding)
        self._url = escape_ajax(s)

        if ':' not in self._url:
            raise ValueError('Missing scheme in request url: %s' % self._url)

    url = property(_get_url, obsolete_setter(_set_url, 'url'))

    def _get_body(self):
        return self._body

    def _set_body(self, body):
        if body is None:
            self._body = b''
        else:
            self._body = to_bytes(body, self.encoding)

    body = property(_get_body, obsolete_setter(_set_body, 'body'))

    @property
    def encoding(self):
        return self._encoding

    def __str__(self):
        return "<%s %s>" % (self.method, self.url)

    __repr__ = __str__

    def copy(self):
        """Return a copy of this Request"""
        return self.replace()

    def replace(self, *args, **kwargs):
        """Create a new Request with the same attributes except for those
        given new values.
        """
        for x in ['url', 'method', 'headers', 'body', 'cookies', 'meta', 'flags',
                  'encoding', 'priority', 'dont_filter', 'callback', 'errback']:
            kwargs.setdefault(x, getattr(self, x))
        cls = kwargs.pop('cls', self.__class__)
        return cls(*args, **kwargs)      

​Request​

​對象比較簡單,就是簡單封裝了請求參數、方式、回調以及可附加的屬性資訊。

當然,你也可以在子類重寫​

​start_requests​

​​以及​

​make_requests_from_url​

​這2個方法,來建構種子請求。

引擎排程

回到​

​crawl​

​​方法,建構好種子請求對象後,調用了​

​engine​

​​的​

​open_spider​

​方法:

Scrapy源碼閱讀分析_4_請求處理流程

初始化的過程之前的文章已講到,這裡不再多說。主要說一下處理流程,這裡第一步是建構了​

​CallLaterOnce​

​​,把​

​_next_request​

​注冊進去,看此類的實作:

class CallLaterOnce(object):
    """Schedule a function to be called in the next reactor loop, but only if
    it hasn't been already scheduled since the last time it ran.
    """
    # 在twisted的reactor中循環排程一個方法

    def __init__(self, func, *a, **kw):
        self._func = func
        self._a = a
        self._kw = kw
        self._call = None

    def schedule(self, delay=0):
        # 上次發起排程,才可再次繼續排程
        if self._call is None:
            self._call = reactor.callLater(delay, self)

    def cancel(self):
        if self._call:
            self._call.cancel()

    def __call__(self):
        # 上面注冊的是self,是以會執行__call__
        self._call = None
        return self._func(*self._a, **self._kw)      

這裡封裝了循環執行的方法類,并且注冊的方法會在 twisted 的 reactor 中異步執行,以後執行隻需調用 schedule 方法,就會注冊 self 到 reactor 的 callLater 中,然後它會執行 __call__ 方法,進而執行我們注冊的方法。而這裡我們注冊的方法是引擎的_next_request,也就是說,此方法會循環排程,直到程式退出。

然後調用了爬蟲中間件的 process_start_requests 方法,也就是說,你可以定義多個自己的爬蟲中間件,每個類都重寫此方法,爬蟲在排程之前會分别調用你定義好的爬蟲中間件,來分别處理初始化請求,你可以進行過濾、加工、篩選以及你想做的任何邏輯。這樣做的好處就是,把想做的邏輯拆分成做個中間件,功能獨立而且維護起來更加清晰。

排程器

接着調用了​

​Scheduler​

​​的​

​open​

​:(scrapy/core/scheduler.py)

Scrapy源碼閱讀分析_4_請求處理流程

在​

​open​

​​方法中,執行個體化出優先級隊列以及根據​

​dqdir​

​決定是否使用磁盤隊列,然後調用了請求指紋過濾器的​

​open​

​​,在父類​

​BaseDupeFilter​

​中定義:

Scrapy源碼閱讀分析_4_請求處理流程

請求過濾器提供了請求過濾的具體實作方式,Scrapy預設提供了​

​RFPDupeFilter​

​過濾器實作過濾重複請求的邏輯,後面講具體是如何過濾重複請求的。

Scraper

再來看​

​Scraper​

​​的​

​open_spider​

​:

Scrapy源碼閱讀分析_4_請求處理流程

這裡的工作主要是​

​Scraper​

​​調用所有​

​Pipeline​

​​的​

​open_spider​

​​方法,也就是說,如果我們定義了多個​

​Pipeline​

​​輸出類,可重寫​

​open_spider​

​​完成每個​

​Pipeline​

​處理輸出開始的初始化工作。

循環排程

調用了一些列的元件的​

​open​

​​方法後,最後調用了​

​nextcall.schedule()​

​開始排程,

Scrapy源碼閱讀分析_4_請求處理流程

也就是循環執行在上面注冊的 ​

​_next_request​

​方法:

Scrapy源碼閱讀分析_4_請求處理流程

_next_request 方法首先調用 _needs_backout 方法檢查是否需要等待,等待的條件有:

  • 引擎是否主動關閉
  • Slot是否關閉
  • 下載下傳器網絡下載下傳超過預設參數
  • Scraper處理輸出超過預設參數

如果不需要等待,則調用 _next_request_from_scheduler,此方法從名字上就能看出,主要是從 Schduler中 擷取 Request。

這裡要注意,在第一次調用此方法時,Scheduler 中是沒有放入任何 Request 的,這裡會直接 break 出來,執行下面的邏輯,而下面就會調用 crawl 方法,實際是把請求放到 Scheduler 的請求隊列,放入隊列的過程會經過 請求過濾器 校驗是否重複。

下次再調用 _next_request_from_scheduler 時,就能從 Scheduler 中擷取到下載下傳請求,然後執行下載下傳動作。

先來看第一次排程,執行 crawl:

Scrapy源碼閱讀分析_4_請求處理流程

調用引擎的​

​crawl​

​​實際就是把請求放入​

​Scheduler​

​的隊列中,下面看請求是如何入隊列的。

請求入隊

​Scheduler ​

​請求入隊方法:(scrapy/core/schedulter.py)

Scrapy源碼閱讀分析_4_請求處理流程

在之前将核心元件執行個體化時有說到,排程器主要定義了2種隊列:基于磁盤隊列、基于記憶體隊列。

如果在執行個體化​

​Scheduler​

​​時候傳入​

​jobdir​

​,則使用磁盤隊列,否則使用記憶體隊列,預設使用記憶體隊列。

指紋過濾

在入隊之前,首先通過請求指紋過濾器檢查請求是否重複,也就是調用了過濾器的​

​request_seen​

​:

(scrapy/duperfilters.py)

Scrapy源碼閱讀分析_4_請求處理流程

​utils.request​

​​的​

​request_fingerprint​

​:

Scrapy源碼閱讀分析_4_請求處理流程

這個過濾器先是通過 Request 對象 生成一個請求指紋,在這裡使用 sha1 算法,并記錄到指紋集合,每次請求入隊前先到這裡驗證一下指紋集合,如果已存在,則認為請求重複,則不會重複入隊列。

不過如果我想不校驗重複,也想重複爬取怎麼辦?看 enqueue_request 的第一行判斷,僅需将 Request 執行個體的 dont_filter 定義為 True 就可以重複爬取此請求,非常靈活。

Scrapy 就是通過此邏輯實作重複請求的過濾邏輯,預設重複請求是不會進行重複抓取的。

下載下傳請求

第一次請求進來後,肯定是不重複的,那麼則會正常進入排程器隊列。然後再進行下一次排程,再次調用_next_request_from_scheduler 方法,此時調用排程器的 next_request 方法,就是從排程器隊列中取出一個請求,這次就要開始進行網絡下載下傳了,也就是調用 _download:(scrapy/core/engine.py)

Scrapy源碼閱讀分析_4_請求處理流程

在進行網絡下載下傳時,調用了​

​Downloader​

​​的​

​fetch​

​:(scrapy/core/downloader/__init__.py)

Scrapy源碼閱讀分析_4_請求處理流程

這裡調用下載下傳器中間件的​

​download​

​方法 (scrapy/core/downloader/middleware.py),并注冊下載下傳成功的回調方法是​

​_enqueue_request​

​,來看下載下傳方法:

Scrapy源碼閱讀分析_4_請求處理流程

在下載下傳過程中,首先先找到所有定義好的下載下傳器中間件,包括内置定義好的,也可以自己擴充下載下傳器中間件,下載下傳前先依次執行process_request 方法,可對 request 進行加工、處理、校驗等操作,然後發起真正的網絡下載下傳,也就是第一個參數download_func,在這裡是 Downloader 的 _enqueue_request 方法:

下載下傳成功後回調 Downloader 的 _enqueue_request:(scrapy/core/downloader/__init__.py)

Scrapy源碼閱讀分析_4_請求處理流程

在這裡,也維護了一個下載下傳隊列,可根據配置達到延遲下載下傳的要求。真正發起下載下傳請求的是調用了​

​self.handlers.download_request​

​:(scrapy/core/downloader/handlers/__init__.py)

Scrapy源碼閱讀分析_4_請求處理流程

下載下傳前,先通過解析​

​request​

​​的​

​scheme​

​來擷取對應的下載下傳處理器,預設配置檔案中定義的下載下傳處理器:

Scrapy源碼閱讀分析_4_請求處理流程

然後調用 download_request 方法,完成網絡下載下傳,這裡不再詳細講解每個處理器的實作,簡單來說你就把它想象成封裝好的網絡下載下傳庫,輸入URL,輸出下載下傳結果就好了,這樣友善了解。

在下載下傳過程中,如果發生異常情況,則會依次調用下載下傳器中間件的process_exception方法,每個中間件隻需定義自己的異常處理邏輯即可。

如果下載下傳成功,則會依次執行下載下傳器中間件的 process_response 方法,每個中間件可以進一步處理下載下傳後的結果,最終傳回。

這裡值得提一下,除了process_request 方法是每個中間件順序執行的,而 process_response 和 process_exception 方法是每個中間件倒序執行的,具體可看一下 DownaloderMiddlewareManager 的 _add_middleware 方法,可明白是如何注冊這個方法鍊的。

拿到最終的下載下傳結果後,再回到 ExecuteEngine 的 _next_request_from_scheduler 方法,會看到調用了_handle_downloader_output 方法,也就是處理下載下傳結果的邏輯:

Scrapy源碼閱讀分析_4_請求處理流程

拿到下載下傳結果後,主要分2個邏輯,如果是 Request 執行個體,則直接再次放入 Scheduler 請求隊列。如果是 Response 或 Failure 執行個體,則調用 Scraper 的 enqueue_scrape 方法,進行進一步處理。

Scrapyer 主要是與 Spider 子產品和 Pipeline 子產品進行互動。

處理下載下傳結果

請求入隊邏輯不用再說,前面已經講過。現在主要看​

​Scraper​

​​的​

​enqueue_scrape​

​​,看​

​Scraper​

​元件是如何處理後續邏輯的:

(scrapy/core/scraper.py)

Scrapy源碼閱讀分析_4_請求處理流程

首先加入到​

​Scraper​

​的處理隊列中,然後從隊列中擷取到任務,如果不是異常結果,則調用 爬蟲中間件管理器 的 ​

​scrape_response ​

​方法:

Scrapy源碼閱讀分析_4_請求處理流程

有沒有感覺套路很熟悉?與上面下載下傳器中間件調用方式非常相似,也調用一系列的前置方法,再執行真正的處理邏輯,然後執行一些列的後置方法。

回調爬蟲

在這裡真正的處理邏輯是​

​call_spider​

​,也就是回調我們寫的爬蟲類:(scrapy/core/scraper.py)

Scrapy源碼閱讀分析_4_請求處理流程

看到這裡,你應該更熟悉,平時我們寫的最多的爬蟲子產品的​

​parse​

​​則是第一個回調方法,後續爬蟲子產品拿到下載下傳結果,可定義下載下傳後的​

​callback​

​就是在這裡進行回調執行的。

處理輸出

在與爬蟲子產品互動完成之後,​

​Scraper​

​​調用了​

​handle_spider_output​

​方法處理輸出結果:

Scrapy源碼閱讀分析_4_請求處理流程

我們編寫爬蟲類時,寫的那些回調方法處理邏輯,也就是在這裡被回調執行,執行完自定義的解析邏輯後,解析方法可傳回新的 Request 或 BaseItem 執行個體,如果是新的請求,則再次通過 Scheduler 進入請求隊列,如果是 BaseItem 執行個體,則調用 Pipeline 管理器,依次執行 process_item,也就是我們想輸出結果時,隻定義 Pepeline 類,然後重寫這個方法就可以了。

ItemPipeManager 處理邏輯:

Scrapy源碼閱讀分析_4_請求處理流程

可以看到​

​ItemPipeManager​

​​也是一個中間件,和之前下載下傳器中間件管理器和爬蟲中間件管理器類似,如果子類有定義​

​process_item​

​,則依次執行它。

執行完後,調用​

​_itemproc_finished​

​:

Scrapy源碼閱讀分析_4_請求處理流程

這裡可以看到,如果想在 Pipeline中 丢棄某個結果,直接抛出 DropItem 異常即可,Scrapy 會進行對應的處理。

到這裡,抓取結果根據自定義的輸出類輸出到指定位置,而新的 Request 則會再次進入請求隊列,等待引擎下一次排程,也就是再次調用 ExecutionEngine 的 _next_request 方法,直至請求隊列沒有新的任務,整個程式退出。

CrawlerSpider

這裡也簡單說一下 CrawlerSpider 類,它其實就繼承了 Spider 類,然後重寫了 parse 方法(這也是內建此類不要重寫此方法的原因),并結合Rule等規則類,來完成Request的自動提取邏輯。

由此也可看出,Scrapy 的每個子產品的實作都非常純粹,每個元件都通過配置檔案定義連接配接起來,如果想要擴充或替換,隻需定義并實作自己的處理邏輯即可,其他子產品均不受任何影響,這也導緻編寫一個插件是變得多麼容易!

總結

總結一下整個運作流程,還是用這兩張圖表示再清楚不過:

Scrapy源碼閱讀分析_4_請求處理流程
Scrapy源碼閱讀分析_4_請求處理流程

Scrapy整體給我的感覺是,雖然它提供的隻是單機版的爬蟲架構,但我們可以通過編寫更多的插件和替換某些元件,來定制化自己的爬蟲,進而來實作更強大的功能,例如分布式、代理排程、并發控制、可視化、監控等等功能,都是非常友善的!