運作入口
還是回到最初的入口,在Scrapy源碼分析(二)運作入口這篇文章中已經講解到,在執行scrapy指令時,調用流程如下:
- 調用cmdline.py的execute方法
- 調用指令執行個體解析指令行
- 建構CrawlerProcess執行個體,調用crawl和start方法
而 crawl 方法最終是調用了 Cralwer 執行個體的 crawl,這個方法最終把控制權交由 Engine,而 start 方法 注冊好協程池,開始異步排程。
我們來看 Cralwer 的 crawl 方法:(scrapy/crawler.py)
在把控制權交給引擎排程之前,先建立出爬蟲執行個體,然後建立引擎執行個體(此過程見Scrapy源碼分析(三)核心元件初始化),然後調用了
spider
的
start_requests
方法,這個方法就是我們平時寫的最多爬蟲類的父類,它在
spiders/__init__.py
中:
建構請求
在這裡我們能看到,平時我們必須要定義的
start_urls
,原來是在這裡拿來建構
Request
的,來看
Request
的是如何建構的:
(scrapy/http/request/__init__.py)
"""
This module implements the Request class which is used to represent HTTP
requests in Scrapy.
See documentation in docs/topics/request-response.rst
"""
import six
from w3lib.url import safe_url_string
from scrapy.http.headers import Headers
from scrapy.utils.python import to_bytes
from scrapy.utils.trackref import object_ref
from scrapy.utils.url import escape_ajax
from scrapy.http.common import obsolete_setter
class Request(object_ref):
def __init__(self, url, callback=None, method='GET', headers=None, body=None,
cookies=None, meta=None, encoding='utf-8', priority=0,
dont_filter=False, errback=None, flags=None):
# 編碼
self._encoding = encoding # this one has to be set first
# 請求方法
self.method = str(method).upper()
# 設定 URL
self._set_url(url)
# 設定 body
self._set_body(body)
assert isinstance(priority, int), "Request priority not an integer: %r" % priority
# 優先級
self.priority = priority
if callback is not None and not callable(callback):
raise TypeError('callback must be a callable, got %s' % type(callback).__name__)
if errback is not None and not callable(errback):
raise TypeError('errback must be a callable, got %s' % type(errback).__name__)
assert callback or not errback, "Cannot use errback without a callback"
# 回調函數
self.callback = callback
# 異常回調函數
self.errback = errback
# cookies
self.cookies = cookies or {}
# 建構Header
self.headers = Headers(headers or {}, encoding=encoding)
# 是否需要過濾
self.dont_filter = dont_filter
# 附加資訊
self._meta = dict(meta) if meta else None
self.flags = [] if flags is None else list(flags)
@property
def meta(self):
if self._meta is None:
self._meta = {}
return self._meta
def _get_url(self):
return self._url
def _set_url(self, url):
if not isinstance(url, six.string_types):
raise TypeError('Request url must be str or unicode, got %s:' % type(url).__name__)
s = safe_url_string(url, self.encoding)
self._url = escape_ajax(s)
if ':' not in self._url:
raise ValueError('Missing scheme in request url: %s' % self._url)
url = property(_get_url, obsolete_setter(_set_url, 'url'))
def _get_body(self):
return self._body
def _set_body(self, body):
if body is None:
self._body = b''
else:
self._body = to_bytes(body, self.encoding)
body = property(_get_body, obsolete_setter(_set_body, 'body'))
@property
def encoding(self):
return self._encoding
def __str__(self):
return "<%s %s>" % (self.method, self.url)
__repr__ = __str__
def copy(self):
"""Return a copy of this Request"""
return self.replace()
def replace(self, *args, **kwargs):
"""Create a new Request with the same attributes except for those
given new values.
"""
for x in ['url', 'method', 'headers', 'body', 'cookies', 'meta', 'flags',
'encoding', 'priority', 'dont_filter', 'callback', 'errback']:
kwargs.setdefault(x, getattr(self, x))
cls = kwargs.pop('cls', self.__class__)
return cls(*args, **kwargs)
Request
對象比較簡單,就是簡單封裝了請求參數、方式、回調以及可附加的屬性資訊。
當然,你也可以在子類重寫
start_requests
以及
make_requests_from_url
這2個方法,來建構種子請求。
引擎排程
回到
crawl
方法,建構好種子請求對象後,調用了
engine
的
open_spider
方法:
初始化的過程之前的文章已講到,這裡不再多說。主要說一下處理流程,這裡第一步是建構了
CallLaterOnce
,把
_next_request
注冊進去,看此類的實作:
class CallLaterOnce(object):
"""Schedule a function to be called in the next reactor loop, but only if
it hasn't been already scheduled since the last time it ran.
"""
# 在twisted的reactor中循環排程一個方法
def __init__(self, func, *a, **kw):
self._func = func
self._a = a
self._kw = kw
self._call = None
def schedule(self, delay=0):
# 上次發起排程,才可再次繼續排程
if self._call is None:
self._call = reactor.callLater(delay, self)
def cancel(self):
if self._call:
self._call.cancel()
def __call__(self):
# 上面注冊的是self,是以會執行__call__
self._call = None
return self._func(*self._a, **self._kw)
這裡封裝了循環執行的方法類,并且注冊的方法會在 twisted 的 reactor 中異步執行,以後執行隻需調用 schedule 方法,就會注冊 self 到 reactor 的 callLater 中,然後它會執行 __call__ 方法,進而執行我們注冊的方法。而這裡我們注冊的方法是引擎的_next_request,也就是說,此方法會循環排程,直到程式退出。
然後調用了爬蟲中間件的 process_start_requests 方法,也就是說,你可以定義多個自己的爬蟲中間件,每個類都重寫此方法,爬蟲在排程之前會分别調用你定義好的爬蟲中間件,來分别處理初始化請求,你可以進行過濾、加工、篩選以及你想做的任何邏輯。這樣做的好處就是,把想做的邏輯拆分成做個中間件,功能獨立而且維護起來更加清晰。
排程器
接着調用了
Scheduler
的
open
:(scrapy/core/scheduler.py)
在
open
方法中,執行個體化出優先級隊列以及根據
dqdir
決定是否使用磁盤隊列,然後調用了請求指紋過濾器的
open
,在父類
BaseDupeFilter
中定義:
請求過濾器提供了請求過濾的具體實作方式,Scrapy預設提供了
RFPDupeFilter
過濾器實作過濾重複請求的邏輯,後面講具體是如何過濾重複請求的。
Scraper
再來看
Scraper
的
open_spider
:
這裡的工作主要是
Scraper
調用所有
Pipeline
的
open_spider
方法,也就是說,如果我們定義了多個
Pipeline
輸出類,可重寫
open_spider
完成每個
Pipeline
處理輸出開始的初始化工作。
循環排程
調用了一些列的元件的
open
方法後,最後調用了
nextcall.schedule()
開始排程,
也就是循環執行在上面注冊的
_next_request
方法:
_next_request 方法首先調用 _needs_backout 方法檢查是否需要等待,等待的條件有:
- 引擎是否主動關閉
- Slot是否關閉
- 下載下傳器網絡下載下傳超過預設參數
- Scraper處理輸出超過預設參數
如果不需要等待,則調用 _next_request_from_scheduler,此方法從名字上就能看出,主要是從 Schduler中 擷取 Request。
這裡要注意,在第一次調用此方法時,Scheduler 中是沒有放入任何 Request 的,這裡會直接 break 出來,執行下面的邏輯,而下面就會調用 crawl 方法,實際是把請求放到 Scheduler 的請求隊列,放入隊列的過程會經過 請求過濾器 校驗是否重複。
下次再調用 _next_request_from_scheduler 時,就能從 Scheduler 中擷取到下載下傳請求,然後執行下載下傳動作。
先來看第一次排程,執行 crawl:
調用引擎的
crawl
實際就是把請求放入
Scheduler
的隊列中,下面看請求是如何入隊列的。
請求入隊
Scheduler
請求入隊方法:(scrapy/core/schedulter.py)
在之前将核心元件執行個體化時有說到,排程器主要定義了2種隊列:基于磁盤隊列、基于記憶體隊列。
如果在執行個體化
Scheduler
時候傳入
jobdir
,則使用磁盤隊列,否則使用記憶體隊列,預設使用記憶體隊列。
指紋過濾
在入隊之前,首先通過請求指紋過濾器檢查請求是否重複,也就是調用了過濾器的
request_seen
:
(scrapy/duperfilters.py)
utils.request
的
request_fingerprint
:
這個過濾器先是通過 Request 對象 生成一個請求指紋,在這裡使用 sha1 算法,并記錄到指紋集合,每次請求入隊前先到這裡驗證一下指紋集合,如果已存在,則認為請求重複,則不會重複入隊列。
不過如果我想不校驗重複,也想重複爬取怎麼辦?看 enqueue_request 的第一行判斷,僅需将 Request 執行個體的 dont_filter 定義為 True 就可以重複爬取此請求,非常靈活。
Scrapy 就是通過此邏輯實作重複請求的過濾邏輯,預設重複請求是不會進行重複抓取的。
下載下傳請求
第一次請求進來後,肯定是不重複的,那麼則會正常進入排程器隊列。然後再進行下一次排程,再次調用_next_request_from_scheduler 方法,此時調用排程器的 next_request 方法,就是從排程器隊列中取出一個請求,這次就要開始進行網絡下載下傳了,也就是調用 _download:(scrapy/core/engine.py)
在進行網絡下載下傳時,調用了
Downloader
的
fetch
:(scrapy/core/downloader/__init__.py)
這裡調用下載下傳器中間件的
download
方法 (scrapy/core/downloader/middleware.py),并注冊下載下傳成功的回調方法是
_enqueue_request
,來看下載下傳方法:
在下載下傳過程中,首先先找到所有定義好的下載下傳器中間件,包括内置定義好的,也可以自己擴充下載下傳器中間件,下載下傳前先依次執行process_request 方法,可對 request 進行加工、處理、校驗等操作,然後發起真正的網絡下載下傳,也就是第一個參數download_func,在這裡是 Downloader 的 _enqueue_request 方法:
下載下傳成功後回調 Downloader 的 _enqueue_request:(scrapy/core/downloader/__init__.py)
在這裡,也維護了一個下載下傳隊列,可根據配置達到延遲下載下傳的要求。真正發起下載下傳請求的是調用了
self.handlers.download_request
:(scrapy/core/downloader/handlers/__init__.py)
下載下傳前,先通過解析
request
的
scheme
來擷取對應的下載下傳處理器,預設配置檔案中定義的下載下傳處理器:
然後調用 download_request 方法,完成網絡下載下傳,這裡不再詳細講解每個處理器的實作,簡單來說你就把它想象成封裝好的網絡下載下傳庫,輸入URL,輸出下載下傳結果就好了,這樣友善了解。
在下載下傳過程中,如果發生異常情況,則會依次調用下載下傳器中間件的process_exception方法,每個中間件隻需定義自己的異常處理邏輯即可。
如果下載下傳成功,則會依次執行下載下傳器中間件的 process_response 方法,每個中間件可以進一步處理下載下傳後的結果,最終傳回。
這裡值得提一下,除了process_request 方法是每個中間件順序執行的,而 process_response 和 process_exception 方法是每個中間件倒序執行的,具體可看一下 DownaloderMiddlewareManager 的 _add_middleware 方法,可明白是如何注冊這個方法鍊的。
拿到最終的下載下傳結果後,再回到 ExecuteEngine 的 _next_request_from_scheduler 方法,會看到調用了_handle_downloader_output 方法,也就是處理下載下傳結果的邏輯:
拿到下載下傳結果後,主要分2個邏輯,如果是 Request 執行個體,則直接再次放入 Scheduler 請求隊列。如果是 Response 或 Failure 執行個體,則調用 Scraper 的 enqueue_scrape 方法,進行進一步處理。
Scrapyer 主要是與 Spider 子產品和 Pipeline 子產品進行互動。
處理下載下傳結果
請求入隊邏輯不用再說,前面已經講過。現在主要看
Scraper
的
enqueue_scrape
,看
Scraper
元件是如何處理後續邏輯的:
(scrapy/core/scraper.py)
首先加入到
Scraper
的處理隊列中,然後從隊列中擷取到任務,如果不是異常結果,則調用 爬蟲中間件管理器 的
scrape_response
方法:
有沒有感覺套路很熟悉?與上面下載下傳器中間件調用方式非常相似,也調用一系列的前置方法,再執行真正的處理邏輯,然後執行一些列的後置方法。
回調爬蟲
在這裡真正的處理邏輯是
call_spider
,也就是回調我們寫的爬蟲類:(scrapy/core/scraper.py)
看到這裡,你應該更熟悉,平時我們寫的最多的爬蟲子產品的
parse
則是第一個回調方法,後續爬蟲子產品拿到下載下傳結果,可定義下載下傳後的
callback
就是在這裡進行回調執行的。
處理輸出
在與爬蟲子產品互動完成之後,
Scraper
調用了
handle_spider_output
方法處理輸出結果:
我們編寫爬蟲類時,寫的那些回調方法處理邏輯,也就是在這裡被回調執行,執行完自定義的解析邏輯後,解析方法可傳回新的 Request 或 BaseItem 執行個體,如果是新的請求,則再次通過 Scheduler 進入請求隊列,如果是 BaseItem 執行個體,則調用 Pipeline 管理器,依次執行 process_item,也就是我們想輸出結果時,隻定義 Pepeline 類,然後重寫這個方法就可以了。
ItemPipeManager 處理邏輯:
可以看到
ItemPipeManager
也是一個中間件,和之前下載下傳器中間件管理器和爬蟲中間件管理器類似,如果子類有定義
process_item
,則依次執行它。
執行完後,調用
_itemproc_finished
:
這裡可以看到,如果想在 Pipeline中 丢棄某個結果,直接抛出 DropItem 異常即可,Scrapy 會進行對應的處理。
到這裡,抓取結果根據自定義的輸出類輸出到指定位置,而新的 Request 則會再次進入請求隊列,等待引擎下一次排程,也就是再次調用 ExecutionEngine 的 _next_request 方法,直至請求隊列沒有新的任務,整個程式退出。
CrawlerSpider
這裡也簡單說一下 CrawlerSpider 類,它其實就繼承了 Spider 類,然後重寫了 parse 方法(這也是內建此類不要重寫此方法的原因),并結合Rule等規則類,來完成Request的自動提取邏輯。
由此也可看出,Scrapy 的每個子產品的實作都非常純粹,每個元件都通過配置檔案定義連接配接起來,如果想要擴充或替換,隻需定義并實作自己的處理邏輯即可,其他子產品均不受任何影響,這也導緻編寫一個插件是變得多麼容易!
總結
總結一下整個運作流程,還是用這兩張圖表示再清楚不過:
Scrapy整體給我的感覺是,雖然它提供的隻是單機版的爬蟲架構,但我們可以通過編寫更多的插件和替換某些元件,來定制化自己的爬蟲,進而來實作更強大的功能,例如分布式、代理排程、并發控制、可視化、監控等等功能,都是非常友善的!