天天看點

python爬蟲并發并行下載下傳1一百萬個網站2串行爬蟲3并發并行爬蟲4性能對比

  • 1一百萬個網站
    • 1用普通方法解析Alexa清單
    • 2複用爬蟲代碼解析Alexa清單
  • 2串行爬蟲
  • 3并發并行爬蟲
    • 0并發并行工作原理
    • 1多線程爬蟲
    • 2多程序爬蟲
  • 4性能對比

這篇将介紹使用多線程和多程序這兩種方式并發并行下載下傳網頁,并将它們與串行下載下傳的性能進行比較。

1一百萬個網站

亞馬遜子公司Alexa提供了最受歡迎的100萬個網站清單(http://www.alexa.com/topsites ),我們也可以通過http://s3.amazonaws.com/alexa-static/top-1m.csv.zip 直接下載下傳這一清單的壓縮檔案,這樣就不用去提取Alexa網站的資料了。

排名 域名
1 google.com
2 youtube.com
3 facebook.com
4 baidu.com
5 yahoo.com
6 wikipedia.com
7 google.co.in
8 amazon.com
9 qq.com
10 google.co.jp
11 live.com
12 taobao.com

1.1用普通方法解析Alexa清單

提取資料的4個步驟: 

- 下載下傳.zip檔案; 

- 從.zip檔案中提取出CSV檔案; 

- 解析CSV檔案; 

- 周遊CSV檔案中的每一行,從中提取出域名資料。

# -*- coding: utf-8 -*-

import csv
from zipfile import ZipFile
from StringIO import StringIO
from downloader import Downloader

def alexa():
    D = Downloader()
    zipped_data = D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip')
    urls = [] # top 1 million URL's will be stored in this list
    with ZipFile(StringIO(zipped_data)) as zf:
        csv_filename = zf.namelist()[]
        for _, website in csv.reader(zf.open(csv_filename)):
            urls.append('http://' + website)
    return urls

if __name__ == '__main__':
    print len(alexa())
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

下載下傳得到的壓縮資料是使用StringIO封裝之後,才傳給ZipFile,是因為ZipFile需要一個相關的接口,而不是字元串。由于這個zip檔案隻包含一個檔案,是以直接選擇第一個檔案即可。然後在域名資料前添加http://協定,附加到URL清單中。

1.2複用爬蟲代碼解析Alexa清單

要複用上述功能,需要修改scrape_callback接口。

# -*- coding: utf-8 -*-

import csv
from zipfile import ZipFile
from StringIO import StringIO
from mongo_cache import MongoCache

class AlexaCallback:
    def __init__(self, max_urls=):
        self.max_urls = max_urls
        self.seed_url = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'

    def __call__(self, url, html):
        if url == self.seed_url:
            urls = []
            #cache = MongoCache()
            with ZipFile(StringIO(html)) as zf:
                csv_filename = zf.namelist()[]
                for _, website in csv.reader(zf.open(csv_filename)):
                    if 'http://' + website not in cache:
                        urls.append('http://' + website)
                        if len(urls) == self.max_urls:
                            break
            return urls
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

這裡添加了一個新的輸入參數max_urls,用于設定從Alexa檔案中提取的URL數量。如果真要下載下傳100萬個網頁,那要消耗11天的時間,是以這裡隻設定為1000個URL。

2串行爬蟲

# -*- coding: utf-8 -*-

from link_crawler import link_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback

def main():
    scrape_callback = AlexaCallback()
    cache = MongoCache()
    #cache.clear()
    link_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, timeout=, ignore_robots=True)

if __name__ == '__main__':
    main()
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

time python ...

3并發并行爬蟲

為了加快下載下傳網頁速度,我們用多程序和多線程将串行下載下傳擴充成并發下載下傳,并将delay辨別最小時間間隔為1秒,以免造成伺服器過載,或導緻IP位址封禁。

3.0并發并行工作原理

并行是基于多處理器多核而言的,讓多個處理器多核真正同時跑多個程式或多個程序。而并發是單個處理器而言的,同一時刻每個處理器隻會執行一個程序,然後在不同程序間快速切換,宏觀上給人以多個程式同時運作的感覺,但微觀上單個處理器還是串行工作的。同理,在一個程序中,程式的執行也是不同線程間進行切換的,每個線程執行程式的的不同部分。這就意味着當一個線程

等待

網頁下載下傳時,程序可以切換到其他線程執行,避免浪費處理器時間。是以,為了充分利用計算機中的所有資源盡可能快地下載下傳資料,我們需要将下載下傳分發到多個程序和線程中。

3.1多線程爬蟲

我們可以修改第一篇文章連結爬蟲隊列結構的代碼,修改為多個線程中啟動爬蟲循環

process_queue()

,以便并發下載下傳這些連結。

import time
import threading
import urlparse
from downloader import Downloader

SLEEP_TIME = 

def threaded_crawler(seed_url, delay=, cache=None, scrape_callback=None, user_agent='Wu_Being', proxies=None, num_retries=, max_threads=, timeout=):
    """Crawl this website in multiple threads
    """
    # the queue of URL's that still need to be crawled
    #crawl_queue = Queue.deque([seed_url])
    crawl_queue = [seed_url]
    # the URL's that have been seen 
    seen = set([seed_url])
    D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)

    def process_queue():
        while True:
            try:
                url = crawl_queue.pop()
            except IndexError:
                # crawl queue is empty
                break
            else:
                html = D(url)
                if scrape_callback:
                    try:
                        links = scrape_callback(url, html) or []
                    except Exception as e:
                        print 'Error in callback for: {}: {}'.format(url, e)
                    else:
                        for link in links:
                            link = normalize(seed_url, link)
                            # check whether already crawled this link
                            if link not in seen:
                                seen.add(link)
                                # add this new link to queue
                                crawl_queue.append(link)

    # wait for all download threads to finish
    threads = []
    while threads or crawl_queue:
        # the crawl is still active
        for thread in threads:
            if not thread.is_alive():
                # remove the stopped threads
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            # can start some more threads
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
            thread.start()
            threads.append(thread)
        # all threads have been processed
        # sleep temporarily so CPU can focus execution on other threads
        time.sleep(SLEEP_TIME)

def normalize(seed_url, link):
    """Normalize this URL by removing hash and adding domain
    """
    link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
    return urlparse.urljoin(seed_url, link)
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

上面代碼在循環會不斷建立線程,直到達到線程池

threads

的最大值。在爬取過程中,如果目前列隊沒有更多可以爬取的URL時,該線程會提前停止。 

例如目前有兩個線程以及兩個待下載下傳的URL,當第一個線程完成下載下傳時,待爬取隊列為空,則該線程退出。第二個線程稍後也完成了下載下傳,但又發現了另一個待下載下傳的URL。此時,thread循環注意到還有URL需要下載下傳,并且線程數未達到最大值,因些建立一個新的下載下傳線程。

# -*- coding: utf-8 -*-

import sys
from threaded_crawler import threaded_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback

def main(max_threads):
    scrape_callback = AlexaCallback()
    cache = MongoCache()
    #cache.clear()
    threaded_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=)

if __name__ == '__main__':
    max_threads = int(sys.argv[])
    main(max_threads)
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

$time python 3threaded_test.py 5

上面使用了5個線程,是以下載下傳速度幾乎是串行版本的5倍。

3.2多程序爬蟲

對于有多核的中央處理器,則可以啟動多程序。

# -*- coding: utf-8 -*-
import sys
from process_crawler import process_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback

def main(max_threads):
    scrape_callback = AlexaCallback()
    cache = MongoCache()
    cache.clear()
    process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=) ##process_crawler

if __name__ == '__main__':
    max_threads = int(sys.argv[])
    main(max_threads)
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

下面代碼首先擷取中央處理器核心個數,然後啟動相應的程序個數,在每程序啟動多個線程爬蟲。之前的爬蟲隊列是存儲在本地記憶體中,其他程序都無法處理這一爬蟲,為了解決這一問題,需要把爬蟲隊列轉移到MongoDB當中。單獨存儲隊列,意味着即使是不同伺服器上的爬蟲也能夠協同處理同一個爬蟲任務。我們可以使用更加健壯的隊列,比如專用的消息傳輸工具

Celery

,這裡我們利用MongoDB實作的隊列代碼。在

threaded_crawler

需要做如下修改: 

- 内建的隊列換成基于MongoDB的新隊列

MongoQueue

; 

- 由于隊列内部實作中處理重複URL的問題,是以不再需要seen變量; 

- 在URL處理結束後調用

complete()

方法,用于記錄該URL已經被成功解析。

import time
import urlparse
import threading
import multiprocessing
from mongo_cache import MongoCache
from mongo_queue import MongoQueue
from downloader import Downloader

SLEEP_TIME = 

### process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10)
def process_crawler(args, **kwargs):    #args:number of args, kwargs:args list
    num_cpus = multiprocessing.cpu_count()
    #pool = multiprocessing.Pool(processes=num_cpus)
    print 'Starting {} processes...'.format(num_cpus)   ######################
    processes = []
    for i in range(num_cpus):
        p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)### threaded_crawler
        #parsed = pool.apply_async(threaded_link_crawler, args, kwargs)
        p.start()
        processes.append(p)
    # wait for processes to complete
    for p in processes:
        p.join()

def threaded_crawler(seed_url, delay=, cache=None, scrape_callback=None, user_agent='wu_being', proxies=None, num_retries=, max_threads=, timeout=):
    """Crawl using multiple threads
    """
    # the queue of URL's that still need to be crawled
    crawl_queue = MongoQueue()  ######################
    crawl_queue.clear()     ######################
    crawl_queue.push(seed_url)  ######################
    D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)

    def process_queue():
        while True:
            # keep track that are processing url
            try:
                url = crawl_queue.pop() ######################
            except KeyError:
                # currently no urls to process
                break
            else:
                html = D(url)
                if scrape_callback:
                    try:
                        links = scrape_callback(url, html) or []
                    except Exception as e:
                        print 'Error in callback for: {}: {}'.format(url, e)
                    else:
                        for link in links:      #############
                            # add this new link to queue######################
                            crawl_queue.push(normalize(seed_url, link))######################
                crawl_queue.complete(url)       ######################

    # wait for all download threads to finish
    threads = []
    while threads or crawl_queue:           ######################
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue.peek():    #######################
            # can start some more threads
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
            thread.start()
            threads.append(thread)
        time.sleep(SLEEP_TIME)

def normalize(seed_url, link):
    """Normalize this URL by removing hash and adding domain
    """
    link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
    return urlparse.urljoin(seed_url, link)
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

MongoQueue

定義了三種狀态: 

OUTSTANDING

:添加一人新URL時; 

PROCESSING

:隊列中取出準備下載下傳時; 

COMPLETE

:完成下載下傳時。

由于大部分線程都在從隊列準備取出未完成處理的URL,比如處理的URL線程被終止的情況。是以在該類中使用了

timeout

參數,預設為300秒。在

repaire()

方法中,如果某個URL的處理時間超過了這個timeout值,我們就認定處理過程出現了錯誤,URL的狀态将被重新設為

OUTSTANDING

,以便再次處理。

from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MongoQueue:
    """
    >>> timeout = 1
    >>> url = 'http://example.webscraping.com'
    >>> q = MongoQueue(timeout=timeout)
    >>> q.clear() # ensure empty queue
    >>> q.push(url) # add test URL
    >>> q.peek() == q.pop() == url # pop back this URL
    True
    >>> q.repair() # immediate repair will do nothin
    >>> q.pop() # another pop should be empty
    >>> q.peek() 
    >>> import time; time.sleep(timeout) # wait for timeout
    >>> q.repair() # now repair will release URL
    Released: test
    >>> q.pop() == url # pop URL again
    True
    >>> bool(q) # queue is still active while outstanding
    True
    >>> q.complete(url) # complete this URL
    >>> bool(q) # queue is not complete
    False
    """

    # possible states of a download
    OUTSTANDING, PROCESSING, COMPLETE = range()

    def __init__(self, client=None, timeout=):
        """
        host: the host to connect to MongoDB
        port: the port to connect to MongoDB
        timeout: the number of seconds to allow for a timeout
        """
        self.client = MongoClient() if client is None else client
        self.db = self.client.cache
        self.timeout = timeout

    def __nonzero__(self):
        """Returns True if there are more jobs to process
        """
        record = self.db.crawl_queue.find_one(
            {'status': {'$ne': self.COMPLETE}} 
        )
        return True if record else False

    def push(self, url):
        """Add new URL to queue if does not exist
        """
        try:
            self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING})
        except errors.DuplicateKeyError as e:
            pass # this is already in the queue

    def pop(self):
        """Get an outstanding URL from the queue and set its status to processing.
        If the queue is empty a KeyError exception is raised.
        """
        record = self.db.crawl_queue.find_and_modify(
            query={'status': self.OUTSTANDING}, 
            update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
        )
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def peek(self):
        record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING})
        if record:
            return record['_id']

    def complete(self, url):
        self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

    def repair(self):
        """Release stalled jobs
        """
        record = self.db.crawl_queue.find_and_modify(
            query={
                'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
                'status': {'$ne': self.COMPLETE}
            },
            update={'$set': {'status': self.OUTSTANDING}}
        )
        if record:
            print 'Released:', record['_id']

    def clear(self):
        self.db.crawl_queue.drop()
           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

4性能對比

腳本 線程數 程序數 時間 與串行時間比
串行 1 1
多線程 5 1
多線程 10 1
多線程 20 1
多程序 5 2
多程序 10 2
多程序 20 2

此外,下載下傳的帶寬是有限的,最終添加新線程将無法加快的下載下傳速度。是以要想獲得更好性能的爬蟲,就需要在多台伺服器上分布式部署爬蟲,并且所有伺服器都要指向同一個MongoDB隊列執行個體中。

Wu_Being 部落格聲明:本人部落格歡迎轉載,請标明部落格原文和原連結!謝謝! 

【Python爬蟲系列】《【Python爬蟲4】并發并行下載下傳》http://blog.csdn.net/u014134180/article/details/55506994 

Python爬蟲系列的GitHub代碼檔案:https://github.com/1040003585/WebScrapingWithPython

繼續閱讀