前言
這次我們使用scrapy中的CrawlSpiders爬取拉勾網。CrawlSpiders是Spider的派生類,用于全站爬取。
開始之前,先介紹一個工具——cmder
cmder是一款Windows環境下非常簡潔美觀易用的cmd替代者,它支援了大部分的Linux指令。支援ssh連接配接linux,使用起來非常友善。
下載下傳cmder後,将其路徑添加到path環境變量中,然後就可以運作了。
用cmder檢視scrapy中可用的spider模闆,然後建立一個crawlspider。
crawlspider源碼解析
crawlspider定義了一些規則(rule)來提供跟進link的友善的機制,可以說是為全站爬取而生。
rules:
一個包含一個(或多個)Rule對象的集合。 每個Rule對爬取網站的動作定義了特定表現。如果多個Rule比對了相同的連結,則根據他們在本屬性中被定義的順序,第一個會被使用。
使用執行個體如下:
rules = (
Rule(LinkExtractor(allow=("zhaopin/.*",)), follow=True),
Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True)
)
源碼分析
CrawlSpider的源碼不是很多,就直接在它的源碼加上注釋的方式進行分析:
class CrawlSpider(Spider):
rules = ()
def __init__(self, *a, **kw):
super(CrawlSpider, self).__init__(*a, **kw)
self._compile_rules()
#由于CrawlSpider繼承自Spider,是以入口仍然是start_requests。在start_requests中發起start_url請求,然後交給回調函數parse處理(也就是以下步驟)
#1、首先調用parse()方法來處理start_urls中傳回的response對象。
#2、parse()将這些response對象傳遞給了_parse_response()函數處理,并設定回調函數為parse_start_url()。
#3、設定了跟進标志位True,即follow=True。
#4、傳回response。
def parse(self, response):
return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)
#處理start_url中傳回的response,需要重寫。
def parse_start_url(self, response):
return []
def process_results(self, response, results):
return results
def _build_request(self, rule, link):
#構造Request對象,并将Rule規則中定義的回調函數作為這個Request對象的回調函數。這個‘_build_request’函數在下面調用。
r = Request(url=link.url, callback=self._response_downloaded)
r.meta.update(rule=rule, link_text=link.text)
return r
#從response中抽取符合任一使用者定義'規則'的連結,并構造成Resquest對象傳回。
def _requests_to_follow(self, response):
if not isinstance(response, HtmlResponse):
return
seen = set()
#抽取所有連結,隻要通過任意一個'規則',即表示合法。
#周遊所有規則
for n, rule in enumerate(self._rules):
links = [lnk for lnk in rule.link_extractor.extract_links(response)
if lnk not in seen]
if links and rule.process_links:
links = rule.process_links(links)
#将連結加入seen集合,為每個連結生成Request對象,并設定回調函數為_repsonse_downloaded()。
for link in links:
seen.add(link)
#構造Request對象,并将Rule規則中定義的回調函數作為這個Request對象的回調函數。這個‘_build_request’函數在上面定義。
r = self._build_request(n, link)
#對每個Request調用process_request()函數。該函數預設為indentify,即不做任何處理,直接傳回該Request。
yield rule.process_request(r)
#處理通過rule提取出的連接配接,并傳回item以及request。
def _response_downloaded(self, response):
rule = self._rules[response.meta['rule']]
return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow)
#解析response對象,使用callback解析處理他,并傳回request或Item對象。
def _parse_response(self, response, callback, cb_kwargs, follow=True):
#1、首先判斷是否設定了回調函數。(該回調函數可能是rule中的解析函數,也可能是 parse_start_url函數)
#2、如果設定了回調函數(parse_start_url()),那麼首先用parse_start_url()處理response對象,
#3、然後再交給process_results處理。傳回cb_res的一個清單。
if callback:
cb_res = callback(response, **cb_kwargs) or ()
cb_res = self.process_results(response, cb_res)
for requests_or_item in iterate_spider_output(cb_res):
yield requests_or_item
#如果需要跟進,那麼使用定義的Rule規則提取并傳回這些Request對象。
if follow and self._follow_links:
#傳回每個Request對象。
for request_or_item in self._requests_to_follow(response):
yield request_or_item
def _compile_rules(self):
def get_method(method):
if callable(method):
return method
elif isinstance(method, six.string_types):
return getattr(self, method, None)
self._rules = [copy.copy(r) for r in self.rules]
for rule in self._rules:
rule.callback = get_method(rule.callback)
rule.process_links = get_method(rule.process_links)
rule.process_request = get_method(rule.process_request)
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super(CrawlSpider, cls).from_crawler(crawler, *args, **kwargs)
spider._follow_links = crawler.settings.getbool(
'CRAWLSPIDER_FOLLOW_LINKS', True)
return spider
def set_crawler(self, crawler):
super(CrawlSpider, self).set_crawler(crawler)
self._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)
運作流程:
(圖檔來源 https://blog.csdn.net/qianxin12345/article/details/77899672)
爬取拉勾
首先定義url規則:
rules = (
Rule(LinkExtractor(allow=("zhaopin/.*",)), follow=True),
Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True)
)
解析資料項函數:
def parse_job(self, response):
item_loader = JobItemLoader(item=LagouItem(),response=response)
item_loader.add_value('url',response.url)
item_loader.add_value('url_object_id',get_md5(response.url))
item_loader.add_xpath('title','//div[@class="job-name"]/@title')
item_loader.add_xpath('salary', '//dd[@class="job_request"]/p/span[@class="salary"]/text()')
item_loader.add_xpath('job_city', '//dd[@class="job_request"]/p/span[2]/text()')
item_loader.add_xpath('work_years_min', '//dd[@class="job_request"]/p/span[3]/text()')
item_loader.add_xpath('work_years_max', '//dd[@class="job_request"]/p/span[3]/text()')
item_loader.add_xpath('degree_need', '//dd[@class="job_request"]/p/span[4]/text()')
item_loader.add_xpath('job_type', '//dd[@class="job_request"]/p/span[5]/text()')
item_loader.add_xpath('pulish_time', '//p[@class="publish_time"]/text()')
item_loader.add_xpath('tags', '//ul[@class="position-label clearfix"]/li[@class="labels"]/text()')
item_loader.add_xpath('job_advantage', '//dd[@class="job-advantage"]/p/text()')
item_loader.add_xpath('job_desc', '//div[@class="job-detail"]')
item_loader.add_xpath('job_addr', '//div[@class="work_addr"]')
item_loader.add_xpath('company_name', '//dl[@id="job_company"]//h2/text()')
item_loader.add_xpath('company_url', '//ul[@class="c_feature"]//a/@href')
item_loader.add_value('crawl_time', datetime.datetime.now())
job_item = item_loader.load_item()
return job_item
定義items:
from w3lib.html import remove_tags #去除html标簽
class JobItemLoader(ItemLoader):
default_output_processor = TakeFirst()
def remove_line(value):
#去掉斜線
return value.replace('/','')
def handle_pubtime(value):
return value.split(' ')[0]
def remove_place(value):
#去掉空格與換行
return value.replace('\n','').replace('\xa0','').strip()
def year_min(value):
match_re = re.match(".*(\d+)-\d+.*",value)
if match_re:
return match_re.group(1)
else:
return value
def year_max(value):
match_re = re.match(".*\d+-(\d+).*",value)
if match_re:
return match_re.group(1)
else:
return value
def handle_jobaddr(value):
addr_list = value.split("\n")
addr_list = [item.strip() for item in addr_list if item.strip()!="檢視地圖"]
return "".join(addr_list)
class LagouItem(scrapy.Item):
url = scrapy.Field()
url_object_id = scrapy.Field()
title = scrapy.Field()
salary = scrapy.Field()
job_city = scrapy.Field(
input_processor = MapCompose(remove_line)
)
work_years_min = scrapy.Field(
input_processor=MapCompose(remove_line,year_min)
)
work_years_max = scrapy.Field(
input_processor=MapCompose(remove_line,year_max)
)
degree_need = scrapy.Field(
input_processor=MapCompose(remove_line)
)
job_type = scrapy.Field(
input_processor=MapCompose(remove_line)
)
pulish_time = scrapy.Field(
input_processor=MapCompose(handle_pubtime)
)
tags = scrapy.Field(
output_processor=Join(",")
)
job_advantage = scrapy.Field()
job_desc = scrapy.Field(
input_processor=MapCompose(remove_tags,remove_place)
)
job_addr = scrapy.Field(
input_processor=MapCompose(remove_tags,handle_jobaddr)
)
company_url = scrapy.Field()
company_name = scrapy.Field(
input_processor=MapCompose(remove_place)
)
crawl_time = scrapy.Field()
運作一下,立馬被拉勾網判定為爬蟲。
用浏覽器通路拉勾依然沒問題,說明拉勾網沒有封我們的IP,先加個請求頭試試(但是很多人也說要加cookies,我在加與不加之間都嘗試了很多次,最終不加cookies也可以請求成功,很戲劇性):
注意在start_requests與_build_request都要加上headers
headers = {
"HOST": "www.lagou.com",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
}
def start_requests(self):
for url in self.start_urls:
yield scrapy.Request(url, headers=self.headers, dont_filter=True,callback=self.parse)
def _build_request(self, rule, link):
r = Request(url=link.url,headers=self.headers, callback=self._response_downloaded, dont_filter=True)
r.meta.update(rule=rule, link_text=link.text)
return r
運作一下,似乎可以拿到資料了。
寫入資料到Mysql:
1.設計資料表:
2.改寫items
class LagouItem(scrapy.Item):
#省略部分代碼。。。。
def get_sql(self):
#便于#根據不同的item 建構不同的sql語句并插入到mysql中
insert_sql = """
insert into lagou_job(url,url_object_id,title,salary,job_city,work_years_min,work_years_max,degree_need,job_type,pulish_time,tags,job_advantage,job_desc,job_addr,company_url,company_name,crawl_time)
VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE salary=VALUES(salary), job_desc=VALUES(job_desc), job_advantage=VALUES(job_advantage),
pulish_time=VALUES(pulish_time),crawl_time=VALUES(crawl_time)
"""
#如果存在主鍵沖突,則更新資料。如果不存在,則插入資料
crawl_time = self["crawl_time"].strftime(SQL_DATETIME_FORMAT)
params = (self["url"], self["url_object_id"], self["title"], self["salary"], self["job_city"], self["work_years_min"], self["work_years_max"], self["degree_need"],self["job_type"],self["pulish_time"],self["tags"],self["job_advantage"],self["job_desc"],self["job_addr"],self["company_url"],self["company_name"], crawl_time)
return insert_sql,params
3.pipelines:
import MySQLdb
import MySQLdb.cursors
from twisted.enterprise import adbapi
class JobspiderPipeline(object):
def process_item(self, item, spider):
return item
class MysqlTwistedPipeline(object):
#采用異步機制寫入mysql
def __init__(self,dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls,settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset = 'utf8',
cursorclass = MySQLdb.cursors.DictCursor,
use_unicode = True
)
dbpool = adbapi.ConnectionPool("MySQLdb",**dbparms)
return cls(dbpool)
def process_item(self,item,spider):
query = self.dbpool.runInteraction(self.do_insert,item)
query.addErrback(self.handle_error,item,spider)
def handle_error(self,failure,item,spider):
print(failure)
def do_insert(self,cursor,item):
insert_sql,params = item.get_sql()
cursor.execute(insert_sql,params)
現在可以看到資料成功入庫了:
但是依然存在兩個問題:
- itemloader處理空值存在問題:我們需要爬取拉勾網頁上的資訊,但不是所有職位網頁中展示的資訊都一樣,例如有些職位資訊網頁上并沒有附上職位标簽tags,而如果沒有需要在資料庫相應的字段中指派為空。 但是itemloader預設會略過空值,造成資料入庫時KeyError(如下圖所示)
- 爬取了5分鐘左右,拉勾網會出現302重定向
首先解決第一個問題:
我們需要對空清單指派為空字元處理
使用itemloader爬取時,傳回的資料類型是list,再存入item容器前,是支援對資料進行預處理的,即輸入處理器和輸出處理器,可以通過MapCompose這個類來依次對list的元素進行處理,但如果list為空則不會進行處理,這種情況需要重載MapCompose類的__call__方法,如下,給value增加一個空格str“ ”
class MapComposeCustom(MapCompose):
#自定義MapCompose,當value沒元素時傳入" "
def __call__(self, value, loader_context=None):
if not value:
value.append(" ")
values = arg_to_iter(value)
if loader_context:
context = MergeDict(loader_context, self.default_loader_context)
else:
context = self.default_loader_context
wrapped_funcs = [wrap_loader_context(f, context) for f in self.functions]
for func in wrapped_funcs:
next_values = []
for v in values:
next_values += arg_to_iter(func(v))
values = next_values
return values
tags = scrapy.Field(
input_processor=MapComposeCustom(remove_line),
output_processor=Join(",")
)
如此一來,第一個問題就解決了。
我們檢查一下資料庫情況:
tags字段中存在為空的資料,那麼說明我們的第一個問題解決了。
(參考文章:https://blog.csdn.net/m0_37323771/article/details/83211816)
第二個問題:
經過試驗,發現隻要爬取時間超過5分鐘,就會出現302重定向問題,此時我們便無法取到需要的資料。
但是稍微等一段時間,就可以正常爬取了。
這個問題,我還沒解決,先占個坑。。。
學藝不精,不對的地方望大家指正~