天天看點

python多線程爬蟲架構

最近仿照書上寫了個多線程爬蟲架構,在實作多程序的時候遇到了困難,不過打算開始學scrapy了也就暫時不管多程序的問題了

首先是緩存部分,在每次下載下傳一個html的時候,首先會查詢mongodb資料庫中是否已經有該頁面的緩存,如果沒有,下載下傳頁面,如果有,獲得緩存的頁面

在mongodb中設定一個特殊索引用于删除逾時的緩存(緩存預設儲存30天),由于該類實作了 __getitem__和__setitem__方法,是以可以直接像操作字典一樣操作這個對象

import pickle

import zlib

from datetime import datetime,timedelta

from pymongo import MongoClient

from bson.binary import Binary

class MongoCache:

    def __init__(self,client = None,expires = timedelta(days=30)):

        #如果沒有傳遞MongoClient對象,建立一個預設對象

        if client is None:

            self.client = MongoClient("localhost",12345)

        #建立一個連接配接想資料庫中緩存資料

        self.db = self.client.cache

        self.db.webpage.create_index("timestamp",expireAfterSeconds=expires.total_seconds())

    def __getitem__(self, url):

        '''

        從資料庫中獲得該url的值

        '''

        record = self.db.webpage.find_one({"_id":url})

        print(record)

        if record:

            return pickle.loads(zlib.decompress(record["result"]))

            #return record["result"]

        else:

            raise KeyError(url+"不存在")

    def __setitem__(self, url,result):

        '''

        将資料儲存到資料庫中

        '''

        record = {"result":Binary(zlib.compress(pickle.dumps(result))),

                  "timestamp":datetime.utcnow()}

        #record = {"result":result,"timestamp":datetime.utcnow()}

        self.db.webpage.update({"_id":url},{"$set":record},upsert=True)

然後是實作下載下傳的類,該類首先檢視緩存,如果緩存中已有html并且響應碼正常,則直接從緩存中擷取html,否則下載下傳頁面

Throttle類實作了下載下傳之間的延時功能

import urllib.request

import time

import datetime

import re

import socket

import random

from DiskCache import DiskCache

DEFAULT_AGENT = "wswp"

DEFAULT_DELAY = 5

DEFAULT_RETRIES = 1

DEFAULT_TIMEOUT = 60

class Downloader:

    '''

    用于下載下傳html的類,可以傳入的參數有

    proxies;代理ip清單,會随機的在清單中抽取代理ip進行下載下傳

    delay:下載下傳同一域名的等待時間,預設一秒

    user_agent:主機名,預設python

    num_retries:下載下傳失敗重新下載下傳次數

    timeout;下載下傳逾時時間

    cache:緩存方式

    '''

    def __init__(self,proxies=None,delay = DEFAULT_DELAY,user_agent = DEFAULT_AGENT,num_retries = DEFAULT_RETRIES,timeout = DEFAULT_TIMEOUT,opener = None,cache=None):

        #設定逾時時間

        socket.setdefaulttimeout(timeout)

        self.throttle = Throttle(delay)

        self.user_agent = user_agent

        self.proxies = proxies

        self.num_retries = num_retries

        self.opener = opener

        self.cache = cache

    def   __call__(self,url):

        '''

        帶有緩存功能的下載下傳方法,通過類對象可以直接調用

        '''

        print(self.user_agent)

        print("開始下載下傳"+url)

        result = None

        if self.cache:

            try:

                #從緩存中擷取url對應的資料

                result = self.cache[url]

                print("測試代碼4")

            except KeyError:

                #如果獲得KeyError異常,跳過

                pass

            else:

                #如果是未成功下載下傳的網頁,重新下載下傳

                if result["code"]:

                    if self.num_retries > 0 and 500<result["code"]<600:

                        result = None

        # 如果頁面不存在,下載下傳該頁面

        if result is None:

            #延遲默時間

            self.throttle.wait(url)

            if self.proxies:

                #如果有代理IP,從代理IP清單中随機抽取一個代理IP

                proxy = random.choice(self.proxies)

            else:

                proxy = None

            #構造請求頭

            headers = {"User-agent":self.user_agent}

            #下載下傳頁面

            result = self.download(url,headers,proxy = proxy,num_retries = self.num_retries)

            '''

            file = open("f:\\bilibili.html","wb")

            file.write(result["html"])

            file.close()

        '''

            if self.cache:

                #如果有緩存方式,緩存網頁

                self.cache[url] = result

        print(url,"頁面下載下傳完成")

        return result["html"]

    def download(self,url,headers,proxy,num_retries,data=None):

        '''

        用于下載下傳一個頁面,傳回頁面和與之對應的狀态碼

        '''

        #建構請求

        request = urllib.request.Request(url,data,headers or {})

        request.add_header("Cookie","finger=7360d3c2; UM_distinctid=15c59703db998-0f42b4b61afaa1-5393662-100200-15c59703dbcc1d; pgv_pvi=653650944; fts=1496149148; sid=bgsv74pg; buvid3=56812A21-4322-4C70-BF18-E6D646EA78694004infoc; CNZZDATA2724999=cnzz_eid%3D214248390-1496147515-https%253A%252F%252Fwww.baidu.com%252F%26ntime%3D1496805293")

        request.add_header("Upgrade-Insecure-Requests","1")

        opener = self.opener or urllib.request.build_opener()

        if proxy:

            #如果有代理IP,使用代理IP

            opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxy))

        try:

            #下載下傳網頁

            response = opener.open(request)

            print("code是",response.code)

            html = response.read().decode()

            code = response.code

        except Exception as e:

            print("下載下傳出現錯誤",str(e))

            html = ''

            if hasattr(e,"code"):

                code =e.code

                if num_retries > 0 and 500<code<600:

                    #如果錯誤不是未找到網頁,則重新下載下傳num_retries次

                    return self.download(url,headers,proxy,num_retries-1,data)

            else:

                code = None

        print(html)

        return {"html":html,"code":code}

class Throttle:

    '''

    按照延時,請求,代理IP等下載下傳網頁,處理網頁中的link的類

    '''

    def __init__(self, delay):

        self.delay = delay

        self.domains = {}

    def wait(self, url):

        '''

        每下載下傳一個html之間暫停的時間

        '''

        # 獲得域名

        domain = urllib.parse.urlparse(url).netloc

        # 獲得上次通路此域名的時間

        las_accessed = self.domains.get(domain)

        if self.delay > 0 and las_accessed is not None:

            # 計算需要強制暫停的時間 = 要求的間隔時間 - (現在的時間 - 上次通路的時間)

            sleep_secs = self.delay - (datetime.datetime.now() - las_accessed).seconds

            if sleep_secs > 0:

                time.sleep(sleep_secs)

        # 存儲此次通路域名的時間

        self.domains[domain] = datetime.datetime.now()

然後是實作爬蟲功能的類

import time

import threading

import re

import urllib.parse

import datetime

from bs4 import BeautifulSoup

from Downloader import Downloader

from MongoCache import MongoCache

SLEEP_TIME = 1

def get_links(html):

    '''

    獲得一個頁面上的所有連結

    '''

    bs = BeautifulSoup(html, "lxml")

    link_labels = bs.find_all("a")

    # for link in link_labels:

    return [link_label.get('href', "default") for link_label in link_labels]

def same_domain(url1, url2):

    '''

    判斷域名書否相同

    '''

    return urllib.parse.urlparse(url1).netloc == urllib.parse.urlparse(url2).netloc

def normalize(seed_url, link):

    '''

    用于将絕對路徑轉換為相對路徑

    '''

    link, no_need = urllib.parse.urldefrag(link)

    return urllib.parse.urljoin(seed_url, link)

def threader_crawler(seed_url,resource_regiex=None,link_regiex = ".*",delay=5,cache=None,download_source_callback=None,user_agent="wswp",proxies=None, num_retries=1, max_threads=10, timeout=60,max_url=500):

    downloaded = []

    crawl_queue = [seed_url]

    seen = set([seed_url])

    D = Downloader(cache = cache,delay = delay,user_agent=user_agent,proxies=proxies,num_retries=num_retries,timeout=timeout)

    print(user_agent)

    def process_queue():

        while True:

            links = []

            try:

                url = crawl_queue.pop()

            except IndexError:

                break

            else:

                html = D(url)

                downloaded.append(url)

                if download_source_callback:

                    if resource_regiex and re.match(resource_regiex,url):

                        download_source_callback(url,html)

                links.extend([link for link in get_links(html) if re.match(link_regiex,link)])

                for link in links:

                    link = normalize(seed_url, link)

                    if link not in seen:

                        seen.add(link)

                        if same_domain(seed_url,link):

                            crawl_queue.append(link)

                print("已經發現的總網頁數目為",len(seen))

                print("已經下載下傳過的網頁數目為",len(downloaded))

                print("還沒有周遊過的網頁數目為",len(crawl_queue))

    threads=[]

    while threads or crawl_queue:

        if len(downloaded) == max_url:

            return

        for thread in threads:

            if not thread.is_alive():

                threads.remove(thread)

        while len(threads) < max_threads and crawl_queue:

            print("線程數量為", len(threads))

            thread = threading.Thread(target=process_queue)

            thread.setDaemon(True)

            thread.start()

            print("線程數量為", len(threads))

            threads.append(thread)

def main():

    starttime = datetime.datetime.now()

    threader_crawler("http://www.xicidaili.com/",max_threads=1,max_url=10,user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36")

    endtime = datetime.datetime.now()

    print("花費時間",(endtime-starttime).total_seconds())

if __name__ == "__main__":

    main()

經過測試,多線程爬蟲速度要遠遠高于單個線程爬取,簡單測試結果如下

開啟30個線程爬取一百個網站用時31秒,平均一個用時0.31秒

開啟10個線程爬取一百個網頁用時69秒,平均一個用時0.69秒

開啟1 個線程爬取一百個網站用時774秒,平均一個用時7.74秒

順便實作了一個測試用的資源下載下傳類,用于将電影天堂的所有資源頁的電影儲存到資料庫

from lxml import etree
from pymongo import MongoClient
import urllib.request
import re

class download_source_callback:
    def __init__(self,client=None):
        if client:
            self.client = client
        else:
            self.client = MongoClient("localhost",12345)
        self.db = self.client.cache


    def __call__(self,url,html):
        title_regiex = "<title>(.*?)</title>"
        class_regiex = "類  别(.*?)<"
        director_regiex = ".*導  演(.*?)<"
        content_regiex = "簡  介(.*?)<br /><br />◎"
        imdb_regiex = "IMDb評分&nbsp;(.*?)<"
        douban_regiex = "豆瓣評分(.*?)<"
        html = html.decode("gbk","ignore")
        m = re.search(title_regiex,html)
        if m:
            title = m.group(1)
        else:
            title = None
        m = re.search(class_regiex,html)
        if m:
            class_name = m.group(1)
        else:
            class_name = None
        m = re.search(content_regiex,html)
        if m:
            text = m.group(1).replace("<br />","")
            content = text
        else:
            content = None
        m = re.search(douban_regiex,html)
        if m:
            douban = m.group(1)
        else:
            douban = None
        m = re.search(imdb_regiex,html)
        if m:
            imdb = m.group(1)
        else:
            imdb = None
        print(title,class_name,content,douban,imdb)
        move = {
            "name":title,
            "class":class_name,
            "introduce":content,
            "douban":douban,
            "imdb":imdb
        }
        self.db.moves.update({"_id":title},{"$set":move},upsert=True)
        print("成功儲存一部電影"+title)



if __name__ == "__main__":
    html= open("f:\資源.txt").read()

    a = download_source_callback()
    a("http://www.dytt8.net/html/gndy/jddy/20170529/54099.html",html)