天天看點

scrapy 簡單爬取知乎1.首先先模拟登入,主要目的是擷取cookies,重要的是cookies裡面的’name’和’value’2.定義items,定義def get_insert(self),用于pipeline中資料處理3.編寫解析函數,擷取業務需要的内容;4.采用異步方法将資料插入mysql中

1.首先先模拟登录,主要目的是获取cookies,重要的是cookies里面的’name’和’value’

方法1:常规利用request方法进行模拟登录,获取cookieJar,利用requests.utils.dict_from_cookiejar(cookiesjar)方法,将cookiehar转化为dict类型,并传入scrapy.FormRequest中进行数据提交(切记加上headers),实现模拟登录,最后callback回去start_urls,调用parser等函数进行数据解析提取。

def get_xsrf(self):
        '''_xsrf 是一个动态变化的参数'''
        index_url = 'https://www.zhihu.com'
        # 获取登录时需要用到的_xsrf
        index_page = self.session.get(index_url, headers=self.headers)
        html = index_page.text
        pattern = r'name="_xsrf" value="(.*?)"'
        # 这里的_xsrf 返回的是一个list
        _xsrf = re.findall(pattern, html)
        return _xsrf[0]

# 获取验证码
def get_captcha(self):
    t = str(int(time.time() * 1000))
    captcha_url = 'https://www.zhihu.com/captcha.gif?r=' + t + "&type=login"
    r = self.session.get(captcha_url, headers=self.headers)
    with open('captcha.jpg', 'wb') as f:
        f.write(r.content)
        f.close()
    # 用pillow 的 Image 显示验证码
    # 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入
    try:
        im = Image.open('captcha.jpg')
        im.show()
        im.close()
    except:
        print(u'请到 %s 目录找到captcha.jpg 手动输入' % os.path.abspath('captcha.jpg'))
    captcha = input("please input the captcha\n>")
    return captcha

def login(self,secret='xxxxx', account='xxxxx'):
    _xsrf = self.get_xsrf()
    self.headers["X-Xsrftoken"] = _xsrf
    self.headers["X-Requested-With"] = "XMLHttpRequest"
    post_url = 'https://www.zhihu.com/login/phone_num'
    # 通过输入的用户名判断是否是手机号
    postdata = {
        '_xsrf': _xsrf,
        'password': secret,
        'phone_num': account
        }
    # 不需要验证码直接登录成功
    login_page = self.session.post(post_url, data=postdata, headers=self.headers)
    login_code = login_page.json()
    if login_code['r'] == 1:
        # 不输入验证码登录失败
        # 使用需要输入验证码的方式登录
        postdata["captcha"] = self.get_captcha()
        login_page = self.session.post(post_url, data=postdata, headers=self.headers)
        login_code = login_page.json()
        print(login_code['msg'])
    return self.session.cookies
    # 保存 cookies 到文件,
    # 下次可以使用 cookie 直接登录,不需要输入账号和密码

def start_requests(self):  # 重写start_requets,完成用户登录操作
    #从登陆页面获取html信息,记住headers传入

    cookiesjar = self.login()   #由return self.session.cookies获得 #不可以传递进scrapy.FormData中
    cookiesDict = requests.utils.dict_from_cookiejar(cookiesjar)    #cookiesDict可用

    #return [scrapy.FormRequest('https://www.zhihu.com/',cookies=cookiesDict,headers=self.headers,callback=self.after_login)]   #cookiesDict可以传入值
    return [scrapy.FormRequest('https://www.zhihu.com/',cookies=cookiesDict,headers=self.headers,callback=self.after_login)]

def after_login(self,response): #拼接会原有的start_requests
        for url in self.start_urls:
            yield scrapy.Request(url,dont_filter=True,headers=self.headers)  #默认调用parse
           

方法2:利用selenium实现模拟登录,获取cookies,自定义set_cookies函数,标准化cookies,并传入scrapy.FormRequest中进行数据提交(切记加上headers),实现模拟登录,最后callback回去start_urls,调用parser等函数进行数据解析提取。

def login(self,name, passwd):
    url = 'https://www.zhihu.com/#signin'
    # 这里可以用Chrome、Phantomjs等,如果没有加入环境变量,需要指定具体的位置
    driver = webdriver.Firefox()
    driver.set_window_size(1200, 1200)
    driver.get(url)
    print('开始登录')
    use_passwd_login = driver.find_element_by_class_name('signin-switch-password').click()
    login_button = driver.find_element_by_class_name('active').click()
    name_field = driver.find_element_by_name('account')
    name_field.send_keys(name)
    passwd_field = driver.find_element_by_name('password')
    passwd_field.send_keys(passwd)
    auto_login = driver.find_element_by_xpath('//button[contains(@class,"sign-button")]').click()
    time.sleep(10)
    return driver.get_cookies()

def set_cookies(self,drive_cookies):
    #标准化cookies,重新构造
    dict_cookies = {}
    for each in drive_cookies:
        dict_cookies[each['name']] = each['value']
    return dict_cookies

def start_requests(self):  # 重写start_requets,完成用户登录操作
    #从登陆页面获取html信息,记住headers传入
    login_name = 'xxxxx'
    login_passwd = 'xxxxxx'
    cookies = self.login(login_name, login_passwd)

    return [scrapy.FormRequest('https://www.zhihu.com/',cookies=cookies,headers=self.headers,callback=self.after_login)]

def after_login(self,response): #拼接会原有的start_requests
        for url in self.start_urls:
            yield scrapy.Request(url,dont_filter=True,headers=self.headers)  #默认调用parse
           

方法3(最直接简单的方法):直接利用浏览器中的cookies,并传入scrapy.FormRequest中进行数据提交(切记加上headers),实现模拟登录,最后callback回去start_urls,调用parser等函数进行数据解析提取。

cookies = {

xxxx:xxxx

xxxx:xxxx

xxxx:xxxx

}

def start_requests(self):  # 重写start_requets,完成用户登录操作
    #从登陆页面获取html信息,记住headers传入
    return [scrapy.FormRequest('https://www.zhihu.com/',headers=self.header,cookies=self.cookies,callback=self.after_login)]

def after_login(self,response): #拼接会原有的start_requests
        for url in self.start_urls:
            yield scrapy.Request(url,dont_filter=True,headers=self.header)  #默认调用parse
           

2.定义items,定义def get_insert(self),用于pipeline中数据处理

class zhihu_Loader(ItemLoader):
    #定义默认itemloader输出
    default_output_processor = TakeFirst()
    pass

class ZhihuAnswerItem(scrapy.Item):
    author_name = scrapy.Field()
    author_id = scrapy.Field()
    answer_content = scrapy.Field(
        input_processor=MapCompose(soup)
    )
    answer_url = scrapy.Field()
    question_id = scrapy.Field()
    answer_parise_num = scrapy.Field()
    answer_comments_num = scrapy.Field()
    answer_creat_time = scrapy.Field(
        input_processor=MapCompose(timestamp_covert_to_datetime)
)
    answer_update_time = scrapy.Field(
        input_processor=MapCompose(timestamp_covert_to_datetime)
    )
    answer_crawl_time = scrapy.Field()

    def get_insert(self):

        # ON DUPLICATE KEY UPDATE answer_content=VALUES (answer_content),answer_parise_num=VALUES (answer_parise_num),answer_comments_num=VALUES (answer_comments_num),answer_update_time=VALUES (answer_update_time) 解决答案更新时,重写爬取,但主键冲突问题
        insert_sql = '''
        insert into answer_database(author_name, author_id, answer_content, answer_url, question_id,answer_parise_num,answer_comments_num,answer_creat_time,answer_update_time,answer_crawl_time)VALUES( %s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE answer_content=VALUES (answer_content),answer_parise_num=VALUES (answer_parise_num),answer_comments_num=VALUES (answer_comments_num),answer_update_time=VALUES (answer_update_time)
                        '''
        params = (self['author_name'],self['author_id'],self['answer_content'],self['answer_url'],self['question_id'],self['answer_parise_num'],self['answer_comments_num'],self['answer_creat_time'],self['answer_update_time'],self['answer_crawl_time'])
        return insert_sql,params
    pass

class ZhihuQuestionItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    question_id = scrapy.Field()
    question_url = scrapy.Field()
    question_title = scrapy.Field()
    topic = scrapy.Field(
        output_processor = list_to_str     #重写output_processor,覆盖原有的
    )
    answer_num = scrapy.Field(
        input_processor=MapCompose(get_num)
    )
    comment_num = scrapy.Field(
        input_processor=MapCompose(get_num)
    )
    focus_num = scrapy.Field()
    watch_num = scrapy.Field()
    content = scrapy.Field()

def get_insert(self):
    insert_sql = '''
                insert into question_database(question_id, question_url, title, topic, answer_num, comment_num, focus_num, watch_num, content)VALUES( %s,%s,%s,%s,%s,%s,%s,%s,%s)
                    '''
    params = (self['question_id'], self['question_url'], self['question_title'], self['topic'], self['answer_num'],
              self['comment_num'], self['focus_num'], self['watch_num'], self['content'])
    return insert_sql,params
pass
           

3.编写解析函数,获取业务需要的内容;

分两个部分;

1)parser,解析网页,获取问题urls(有个问题,为啥我每次都只能获取3个有效的问题url?这个似乎是知乎利用js动态加载的,在network中发现batch,是post请求,post requset_payload,似乎用于加载新的url,但是Content-Type:application/x-protobuf,导致我看到额request_payload是乱码,这个以后再研究),利用回调函数,传值给question_parser

def parse(self, response):  #默认调用parse函数
        all_urls = response.xpath('.//@href').extract()
        #all_urls = response.css('a::attr(href)').extract()
        all_urls = [urljoin(response.url,url) for url in all_urls]
        #过滤掉javascript等其他无用url
        #all_urls = filter(lambda x:True if 'https' in x else False,all_urls)
        #过滤掉非question的url
        for url in all_urls:
            print(url)
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*",url)
            if match_obj:
                requests_url = match_obj.group(1)   #获取url
                question_id = match_obj.group(2)    #获取问题id
                yield scrapy.Request(url,headers=self.header,meta={'question_id':question_id},callback=self.question_parser)
           

2)question_parser,传入已经获取的url,提取需要的内容进item中,利用item_loader = zhihu_Loader(item=ZhihuQuestionItem(), response=response)声明item_loader,利用question_item = item_loader.load_item()将值load进item中,最后切记yield question_item,结尾将构造的answer_json_url利用回调函数传值给answer_parser

def question_parser(self,response):
    item_loader = zhihu_Loader(item=ZhihuQuestionItem(), response=response)  # response 是scrapy返回的response
    item_loader.add_value("question_id", response.meta['question_id'])
    item_loader.add_value("question_url", response.url)
    item_loader.add_xpath("question_title",'//h1[@class="QuestionHeader-title"]/text()') #在scrapy.Field中处理
    item_loader.add_xpath("topic", '//div[@class="QuestionHeader-topics"]//text()')         #利用“//”标识旗下的咨询节点    #多个topic,需要处理,在scrapy.Field里进行处理
    item_loader.add_xpath("answer_num", '//a[@class="QuestionMainAction"]//text()')      #多个topic,需要处理,在scrapy.Field里进行处理,少于3个答案,可以num = 0
    item_loader.add_xpath("comment_num", '//button [@class="Button Button--plain"]/text()')
    item_loader.add_xpath("watch_num", '//div[@class="NumberBoard-item"]/div[2]/text()')
    item_loader.add_xpath("focus_num", '//button[@class="Button NumberBoard-item Button--plain"]/div[2]/text()')
    item_loader.add_xpath("content", '//div[@class="QuestionRichText QuestionRichText--expandable QuestionRichText--collapsed"]/div//text()')
    question_item = item_loader.load_item()
           

这个地方利用抓包获取了问题的回答json包,直接构造json_url,传给answer_parser函数

answer_json_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?include=data[*].is_normal,admin_closed_com" \
                      "ment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,is_sticky,collapsed_by" \
                      ",suggest_edit,comment_count,can_comment,content,editable_content,voteup_count,reshipment_settings,comment_permi" \
                      "ssion,created_time,updated_time,review_info,question,excerpt,relationship.is_authorized,is_author,voting,is_thanked,is" \
                      "_nothelp,upvoted_followees;data[*].mark_infos[*].url;data[*].author.follower_count,badge[?(type=best_answerer)].topics&offset" \
                      "={1}&limit={2}&sort_by=default".format(response.meta['question_id'],0,20)        #注意format用法
    yield scrapy.Request(answer_json_url, headers=self.header, callback=self.answer_parser)
    yield question_item
           

3)answer_parser,和question_parser差不多,但是没有用item_loader,因为每次json_data是获取20条内容,通过if is_end==False,判断是否还有后续数据通过 yield scrapy.Request进行递归循环,获取所有数据。

def answer_parser(self,response):
    #利用json处理answer
    answer_json = json.loads(response.text)
    #is_start用于判断json页面是否是最后一页
    is_start = answer_json['paging']['is_start']
    is_end = answer_json['paging']['is_end']
    next_url = answer_json['paging']['next']

    #获取每个答案
    for answer in answer_json['data']:
        answer_item = ZhihuAnswerItem()
        answer_item['author_name'] = answer['author']['name']
        answer_item['author_id'] = answer['author']['id']
        answer_item['answer_content'] = answer['content']
        answer_item['answer_url'] = answer['url']
        answer_item['question_id'] = answer['question']['id']
        answer_item['answer_parise_num'] = answer['voteup_count']
        answer_item['answer_comments_num'] = answer['comment_count']
        answer_item['answer_creat_time'] = datetime.datetime.fromtimestamp(answer['created_time'])  #时间戳转化为时间
        answer_item['answer_update_time'] = datetime.datetime.fromtimestamp(answer['updated_time']) #时间戳转化为时间
        answer_item['answer_crawl_time'] = datetime.date.today()
        yield answer_item
    if is_end==False:
        yield scrapy.Request(next_url,headers=self.header,callback=self.answer_parser)
           

4.采用异步方法将数据插入mysql中

1)方法1:def do_insert(self, cursor, item)方法,具有高配置性,

class MysqlTwistedPipline_getSQLfunc(object):
    # 采用异步插入数据库中
    def __init__(self, dbpool):
        self.dbpool = dbpool

@classmethod
def from_settings(cls, settings):
    dbparms = dict(
        host=settings["MYSQL_HOST"],
        port=settings["MYSQL_PORT"],
        user=settings["MYSQL_USER"],
        passwd=settings["MYSQL_PASSWORD"],
        db=settings["MYSQL_DB"],
        use_unicode=True,
        charset=settings["MYSQL_CHARSET"],
    )
    dbpool = adbapi.ConnectionPool("pymysql", **dbparms)
    return cls(dbpool)

def process_item(self, item, spider):
    # 使用twisted将mysql插入变成异步
    query = self.dbpool.runInteraction(self.do_insert, item)
    query.addErrback(self.handle_error)

def handle_error(self, failure):
    # 处理异步插入的异常
    print(failure)

def do_insert(self, cursor, item):  # 不能一次些两个集成object,不然只会执行一个
    #函数写在了item中
    insert_sql, params = item.get_insert()
    cursor.execute(insert_sql, params)
           

2)方法2:直接在pipeline中写死,方法简单,配置性低

class MysqlTwistedPipline(object):
     #采用异步插入数据库中
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms = dict(
            host=settings["MYSQL_HOST"],
            port=settings["MYSQL_PORT"],
            user=settings["MYSQL_USER"],
            passwd=settings["MYSQL_PASSWORD"],
            db=settings["MYSQL_DB"],
            use_unicode=True,
            charset=settings["MYSQL_CHARSET"],
        )
        dbpool = adbapi.ConnectionPool("pymysql", **dbparms)
        return cls(dbpool)

    def process_item(self, item, spider):
            #使用twisted将mysql插入变成异步
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error)

    def handle_error(self, failure):
        # 处理异步插入的异常
        print(failure)

    def do_insert(self, cursor, item):      #不能一次些两个集成object,不然只会执行一个

        #  具体插入数据
        if item.__class__.__name__ == 'ZhihuAnswerItem':
            insert_sql = '''
                    insert into answer_database(author_name, author_id, answer_content, answer_url, question_id,answer_parise_num,answer_comments_num,answer_creat_time,answer_update_time,answer_crawl_time)VALUES( %s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
                '''
            cursor.execute(insert_sql, (item['author_name'],item['author_id'],item['answer_content'],item['answer_url'],item['question_id'],item['answer_parise_num'],item['answer_comments_num'],item['answer_creat_time'],item['answer_update_time'],item['answer_crawl_time']))


        if item.__class__.__name__ == 'ZhihuQuestionItem':  #当传入的irem是ZhihuQuestionItem时执行
            insert_sql = '''
                    insert into question_database(question_id, question_url, title, topic, answer_num, comment_num, focus_num, watch_num, content)VALUES( %s,%s,%s,%s,%s,%s,%s,%s,%s)
                '''
            cursor.execute(insert_sql, (
            item['question_id'], item['question_url'], item['question_title'], item['topic'], item['answer_num'],
            item['comment_num'], item['focus_num'], item['watch_num'], item['content']))
           

繼續閱讀