天天看點

CrawlSpiders全站爬取-拉勾網職位資訊

前言

這次我們使用scrapy中的CrawlSpiders爬取拉勾網。CrawlSpiders是Spider的派生類,用于全站爬取。

開始之前,先介紹一個工具——cmder

cmder是一款Windows環境下非常簡潔美觀易用的cmd替代者,它支援了大部分的Linux指令。支援ssh連接配接linux,使用起來非常友善。

下載下傳cmder後,将其路徑添加到path環境變量中,然後就可以運作了。

CrawlSpiders全站爬取-拉勾網職位資訊
CrawlSpiders全站爬取-拉勾網職位資訊

用cmder檢視scrapy中可用的spider模闆,然後建立一個crawlspider。

crawlspider源碼解析

crawlspider定義了一些規則(rule)來提供跟進link的友善的機制,可以說是為全站爬取而生。

rules:

一個包含一個(或多個)Rule對象的集合。 每個Rule對爬取網站的動作定義了特定表現。如果多個Rule比對了相同的連結,則根據他們在本屬性中被定義的順序,第一個會被使用。

使用執行個體如下:

rules = (
        Rule(LinkExtractor(allow=("zhaopin/.*",)), follow=True),
        Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
        Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True)
    )
           
源碼分析

CrawlSpider的源碼不是很多,就直接在它的源碼加上注釋的方式進行分析:

class CrawlSpider(Spider):

    rules = ()

    def __init__(self, *a, **kw):
        super(CrawlSpider, self).__init__(*a, **kw)
        self._compile_rules()
	
	#由于CrawlSpider繼承自Spider,是以入口仍然是start_requests。在start_requests中發起start_url請求,然後交給回調函數parse處理(也就是以下步驟)
    #1、首先調用parse()方法來處理start_urls中傳回的response對象。
    #2、parse()将這些response對象傳遞給了_parse_response()函數處理,并設定回調函數為parse_start_url()。
    #3、設定了跟進标志位True,即follow=True。
    #4、傳回response。
    def parse(self, response):
        return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)

    #處理start_url中傳回的response,需要重寫。
    def parse_start_url(self, response):
        return []

    def process_results(self, response, results):
        return results

    def _build_request(self, rule, link):
         #構造Request對象,并将Rule規則中定義的回調函數作為這個Request對象的回調函數。這個‘_build_request’函數在下面調用。
        r = Request(url=link.url, callback=self._response_downloaded)
        r.meta.update(rule=rule, link_text=link.text)
        return r

    #從response中抽取符合任一使用者定義'規則'的連結,并構造成Resquest對象傳回。
    def _requests_to_follow(self, response):
        if not isinstance(response, HtmlResponse):
            return
        seen = set()
        #抽取所有連結,隻要通過任意一個'規則',即表示合法。 
        #周遊所有規則
        for n, rule in enumerate(self._rules):
            links = [lnk for lnk in rule.link_extractor.extract_links(response)
                     if lnk not in seen]
            if links and rule.process_links:
                links = rule.process_links(links)
            #将連結加入seen集合,為每個連結生成Request對象,并設定回調函數為_repsonse_downloaded()。
            for link in links:
                seen.add(link)
                #構造Request對象,并将Rule規則中定義的回調函數作為這個Request對象的回調函數。這個‘_build_request’函數在上面定義。
                r = self._build_request(n, link)
                #對每個Request調用process_request()函數。該函數預設為indentify,即不做任何處理,直接傳回該Request。
                yield rule.process_request(r)

    #處理通過rule提取出的連接配接,并傳回item以及request。
    def _response_downloaded(self, response):
        rule = self._rules[response.meta['rule']]
        return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow)

    #解析response對象,使用callback解析處理他,并傳回request或Item對象。
    def _parse_response(self, response, callback, cb_kwargs, follow=True):
        #1、首先判斷是否設定了回調函數。(該回調函數可能是rule中的解析函數,也可能是 parse_start_url函數)  
        #2、如果設定了回調函數(parse_start_url()),那麼首先用parse_start_url()處理response對象,  
        #3、然後再交給process_results處理。傳回cb_res的一個清單。  
        if callback:
            cb_res = callback(response, **cb_kwargs) or ()
            cb_res = self.process_results(response, cb_res)
            for requests_or_item in iterate_spider_output(cb_res):
                yield requests_or_item

        #如果需要跟進,那麼使用定義的Rule規則提取并傳回這些Request對象。
        if follow and self._follow_links:
            #傳回每個Request對象。
            for request_or_item in self._requests_to_follow(response):
                yield request_or_item

    def _compile_rules(self):
        def get_method(method):
            if callable(method):
                return method
            elif isinstance(method, six.string_types):
                return getattr(self, method, None)

        self._rules = [copy.copy(r) for r in self.rules]
        for rule in self._rules:
            rule.callback = get_method(rule.callback)
            rule.process_links = get_method(rule.process_links)
            rule.process_request = get_method(rule.process_request)

    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super(CrawlSpider, cls).from_crawler(crawler, *args, **kwargs)
        spider._follow_links = crawler.settings.getbool(
            'CRAWLSPIDER_FOLLOW_LINKS', True)
        return spider

    def set_crawler(self, crawler):
        super(CrawlSpider, self).set_crawler(crawler)
        self._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)
           

運作流程:

(圖檔來源 https://blog.csdn.net/qianxin12345/article/details/77899672)

CrawlSpiders全站爬取-拉勾網職位資訊
CrawlSpiders全站爬取-拉勾網職位資訊

爬取拉勾

首先定義url規則:

rules = (
   Rule(LinkExtractor(allow=("zhaopin/.*",)), follow=True),
   Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
   Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True)
)
           

解析資料項函數:

def parse_job(self, response):
     item_loader = JobItemLoader(item=LagouItem(),response=response)

     item_loader.add_value('url',response.url)
     item_loader.add_value('url_object_id',get_md5(response.url))
     item_loader.add_xpath('title','//div[@class="job-name"]/@title')
     item_loader.add_xpath('salary', '//dd[@class="job_request"]/p/span[@class="salary"]/text()')
     item_loader.add_xpath('job_city', '//dd[@class="job_request"]/p/span[2]/text()')
     item_loader.add_xpath('work_years_min', '//dd[@class="job_request"]/p/span[3]/text()')
     item_loader.add_xpath('work_years_max', '//dd[@class="job_request"]/p/span[3]/text()')
     item_loader.add_xpath('degree_need', '//dd[@class="job_request"]/p/span[4]/text()')
     item_loader.add_xpath('job_type', '//dd[@class="job_request"]/p/span[5]/text()')
     item_loader.add_xpath('pulish_time', '//p[@class="publish_time"]/text()')
     item_loader.add_xpath('tags', '//ul[@class="position-label clearfix"]/li[@class="labels"]/text()')
     item_loader.add_xpath('job_advantage', '//dd[@class="job-advantage"]/p/text()')
     item_loader.add_xpath('job_desc', '//div[@class="job-detail"]')
     item_loader.add_xpath('job_addr', '//div[@class="work_addr"]')
     item_loader.add_xpath('company_name', '//dl[@id="job_company"]//h2/text()')
     item_loader.add_xpath('company_url', '//ul[@class="c_feature"]//a/@href')
     item_loader.add_value('crawl_time', datetime.datetime.now())

     job_item = item_loader.load_item()
     return job_item
           

定義items:

from w3lib.html import remove_tags #去除html标簽

class JobItemLoader(ItemLoader):
    default_output_processor = TakeFirst()

def remove_line(value):
    #去掉斜線
    return value.replace('/','')

def handle_pubtime(value):
    return value.split(' ')[0]

def remove_place(value):
    #去掉空格與換行
    return value.replace('\n','').replace('\xa0','').strip()

def year_min(value):
    match_re = re.match(".*(\d+)-\d+.*",value)
    if match_re:
        return match_re.group(1)
    else:
        return value

def year_max(value):
    match_re = re.match(".*\d+-(\d+).*",value)
    if match_re:
        return match_re.group(1)
    else:
        return value

def handle_jobaddr(value):
    addr_list = value.split("\n")
    addr_list = [item.strip() for item in addr_list if item.strip()!="檢視地圖"]
    return "".join(addr_list)

class LagouItem(scrapy.Item):
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    title = scrapy.Field()
    salary = scrapy.Field()
    job_city = scrapy.Field(
        input_processor = MapCompose(remove_line)
    )
    work_years_min = scrapy.Field(
        input_processor=MapCompose(remove_line,year_min)
    )
    work_years_max = scrapy.Field(
        input_processor=MapCompose(remove_line,year_max)
    )
    degree_need = scrapy.Field(
        input_processor=MapCompose(remove_line)
    )
    job_type = scrapy.Field(
        input_processor=MapCompose(remove_line)
    )

    pulish_time = scrapy.Field(
        input_processor=MapCompose(handle_pubtime)
    )
    tags = scrapy.Field(
        output_processor=Join(",")
    )
    job_advantage = scrapy.Field()
    job_desc = scrapy.Field(
        input_processor=MapCompose(remove_tags,remove_place)
    )
    job_addr = scrapy.Field(
        input_processor=MapCompose(remove_tags,handle_jobaddr)
    )
    company_url = scrapy.Field()
    company_name = scrapy.Field(
        input_processor=MapCompose(remove_place)
    )
    crawl_time = scrapy.Field()
           

運作一下,立馬被拉勾網判定為爬蟲。

CrawlSpiders全站爬取-拉勾網職位資訊

用浏覽器通路拉勾依然沒問題,說明拉勾網沒有封我們的IP,先加個請求頭試試(但是很多人也說要加cookies,我在加與不加之間都嘗試了很多次,最終不加cookies也可以請求成功,很戲劇性):

注意在start_requests與_build_request都要加上headers

headers = {
      "HOST": "www.lagou.com",
      "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
  }

def start_requests(self):
      for url in self.start_urls:
          yield scrapy.Request(url, headers=self.headers, dont_filter=True,callback=self.parse)

def _build_request(self, rule, link):
    r = Request(url=link.url,headers=self.headers, callback=self._response_downloaded, dont_filter=True)
    r.meta.update(rule=rule, link_text=link.text)
    return r
           

運作一下,似乎可以拿到資料了。

CrawlSpiders全站爬取-拉勾網職位資訊

寫入資料到Mysql:

1.設計資料表:

CrawlSpiders全站爬取-拉勾網職位資訊

2.改寫items

class LagouItem(scrapy.Item):
	#省略部分代碼。。。。
	
	def get_sql(self):
	     #便于#根據不同的item 建構不同的sql語句并插入到mysql中
	     insert_sql = """
	            insert into lagou_job(url,url_object_id,title,salary,job_city,work_years_min,work_years_max,degree_need,job_type,pulish_time,tags,job_advantage,job_desc,job_addr,company_url,company_name,crawl_time)
	            VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
	            ON DUPLICATE KEY UPDATE salary=VALUES(salary), job_desc=VALUES(job_desc), job_advantage=VALUES(job_advantage),
	           pulish_time=VALUES(pulish_time),crawl_time=VALUES(crawl_time)
	        """
	     #如果存在主鍵沖突,則更新資料。如果不存在,則插入資料
	
	     crawl_time = self["crawl_time"].strftime(SQL_DATETIME_FORMAT)
	
	     params = (self["url"], self["url_object_id"], self["title"], self["salary"], self["job_city"], self["work_years_min"], self["work_years_max"], self["degree_need"],self["job_type"],self["pulish_time"],self["tags"],self["job_advantage"],self["job_desc"],self["job_addr"],self["company_url"],self["company_name"], crawl_time)
	     return insert_sql,params
           

3.pipelines:

import MySQLdb
import MySQLdb.cursors
from twisted.enterprise import adbapi


class JobspiderPipeline(object):
    def process_item(self, item, spider):
        return item

class MysqlTwistedPipeline(object):
    #采用異步機制寫入mysql
    def __init__(self,dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls,settings):
        dbparms = dict(
            host = settings["MYSQL_HOST"],
            db = settings["MYSQL_DBNAME"],
            user = settings["MYSQL_USER"],
            passwd = settings["MYSQL_PASSWORD"],
            charset = 'utf8',
            cursorclass = MySQLdb.cursors.DictCursor,
            use_unicode = True
        )
        dbpool = adbapi.ConnectionPool("MySQLdb",**dbparms)
        return cls(dbpool)

    def process_item(self,item,spider):
        query = self.dbpool.runInteraction(self.do_insert,item)
        query.addErrback(self.handle_error,item,spider)

    def handle_error(self,failure,item,spider):
        print(failure)

    def do_insert(self,cursor,item):
        insert_sql,params = item.get_sql()
        cursor.execute(insert_sql,params)
           

現在可以看到資料成功入庫了:

CrawlSpiders全站爬取-拉勾網職位資訊

但是依然存在兩個問題:

  1. itemloader處理空值存在問題:我們需要爬取拉勾網頁上的資訊,但不是所有職位網頁中展示的資訊都一樣,例如有些職位資訊網頁上并沒有附上職位标簽tags,而如果沒有需要在資料庫相應的字段中指派為空。 但是itemloader預設會略過空值,造成資料入庫時KeyError(如下圖所示)
    CrawlSpiders全站爬取-拉勾網職位資訊
  2. 爬取了5分鐘左右,拉勾網會出現302重定向
    CrawlSpiders全站爬取-拉勾網職位資訊

首先解決第一個問題:

我們需要對空清單指派為空字元處理

使用itemloader爬取時,傳回的資料類型是list,再存入item容器前,是支援對資料進行預處理的,即輸入處理器和輸出處理器,可以通過MapCompose這個類來依次對list的元素進行處理,但如果list為空則不會進行處理,這種情況需要重載MapCompose類的__call__方法,如下,給value增加一個空格str“ ”

class MapComposeCustom(MapCompose):
    #自定義MapCompose,當value沒元素時傳入" "
    def __call__(self, value, loader_context=None):
        if not value:
            value.append(" ")
        values = arg_to_iter(value)
        if loader_context:
            context = MergeDict(loader_context, self.default_loader_context)
        else:
            context = self.default_loader_context
        wrapped_funcs = [wrap_loader_context(f, context) for f in self.functions]
        for func in wrapped_funcs:
            next_values = []
            for v in values:
                next_values += arg_to_iter(func(v))
            values = next_values
        return values
           
tags = scrapy.Field(
   input_processor=MapComposeCustom(remove_line),
   output_processor=Join(",")
)
           

如此一來,第一個問題就解決了。

我們檢查一下資料庫情況:

CrawlSpiders全站爬取-拉勾網職位資訊

tags字段中存在為空的資料,那麼說明我們的第一個問題解決了。

(參考文章:https://blog.csdn.net/m0_37323771/article/details/83211816)

第二個問題:

經過試驗,發現隻要爬取時間超過5分鐘,就會出現302重定向問題,此時我們便無法取到需要的資料。

但是稍微等一段時間,就可以正常爬取了。

這個問題,我還沒解決,先占個坑。。。

學藝不精,不對的地方望大家指正~

繼續閱讀