- 1一百萬個網站
- 1用普通方法解析Alexa清單
- 2複用爬蟲代碼解析Alexa清單
- 2串行爬蟲
- 3并發并行爬蟲
- 0并發并行工作原理
- 1多線程爬蟲
- 2多程序爬蟲
- 4性能對比
這篇将介紹使用多線程和多程序這兩種方式并發并行下載下傳網頁,并将它們與串行下載下傳的性能進行比較。
1一百萬個網站
亞馬遜子公司Alexa提供了最受歡迎的100萬個網站清單(http://www.alexa.com/topsites ),我們也可以通過http://s3.amazonaws.com/alexa-static/top-1m.csv.zip 直接下載下傳這一清單的壓縮檔案,這樣就不用去提取Alexa網站的資料了。
排名 | 域名 |
---|---|
1 | google.com |
2 | youtube.com |
3 | facebook.com |
4 | baidu.com |
5 | yahoo.com |
6 | wikipedia.com |
7 | google.co.in |
8 | amazon.com |
9 | qq.com |
10 | google.co.jp |
11 | live.com |
12 | taobao.com |
1.1用普通方法解析Alexa清單
提取資料的4個步驟:
- 下載下傳.zip檔案;
- 從.zip檔案中提取出CSV檔案;
- 解析CSV檔案;
- 周遊CSV檔案中的每一行,從中提取出域名資料。
# -*- coding: utf-8 -*-
import csv
from zipfile import ZipFile
from StringIO import StringIO
from downloader import Downloader
def alexa():
D = Downloader()
zipped_data = D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip')
urls = [] # top 1 million URL's will be stored in this list
with ZipFile(StringIO(zipped_data)) as zf:
csv_filename = zf.namelist()[]
for _, website in csv.reader(zf.open(csv_filename)):
urls.append('http://' + website)
return urls
if __name__ == '__main__':
print len(alexa())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
下載下傳得到的壓縮資料是使用StringIO封裝之後,才傳給ZipFile,是因為ZipFile需要一個相關的接口,而不是字元串。由于這個zip檔案隻包含一個檔案,是以直接選擇第一個檔案即可。然後在域名資料前添加http://協定,附加到URL清單中。
1.2複用爬蟲代碼解析Alexa清單
要複用上述功能,需要修改scrape_callback接口。
# -*- coding: utf-8 -*-
import csv
from zipfile import ZipFile
from StringIO import StringIO
from mongo_cache import MongoCache
class AlexaCallback:
def __init__(self, max_urls=):
self.max_urls = max_urls
self.seed_url = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'
def __call__(self, url, html):
if url == self.seed_url:
urls = []
#cache = MongoCache()
with ZipFile(StringIO(html)) as zf:
csv_filename = zf.namelist()[]
for _, website in csv.reader(zf.open(csv_filename)):
if 'http://' + website not in cache:
urls.append('http://' + website)
if len(urls) == self.max_urls:
break
return urls
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
這裡添加了一個新的輸入參數max_urls,用于設定從Alexa檔案中提取的URL數量。如果真要下載下傳100萬個網頁,那要消耗11天的時間,是以這裡隻設定為1000個URL。
2串行爬蟲
# -*- coding: utf-8 -*-
from link_crawler import link_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback
def main():
scrape_callback = AlexaCallback()
cache = MongoCache()
#cache.clear()
link_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, timeout=, ignore_robots=True)
if __name__ == '__main__':
main()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
time python ...
3并發并行爬蟲
為了加快下載下傳網頁速度,我們用多程序和多線程将串行下載下傳擴充成并發下載下傳,并将delay辨別最小時間間隔為1秒,以免造成伺服器過載,或導緻IP位址封禁。
3.0并發并行工作原理
并行是基于多處理器多核而言的,讓多個處理器多核真正同時跑多個程式或多個程序。而并發是單個處理器而言的,同一時刻每個處理器隻會執行一個程序,然後在不同程序間快速切換,宏觀上給人以多個程式同時運作的感覺,但微觀上單個處理器還是串行工作的。同理,在一個程序中,程式的執行也是不同線程間進行切換的,每個線程執行程式的的不同部分。這就意味着當一個線程
等待
網頁下載下傳時,程序可以切換到其他線程執行,避免浪費處理器時間。是以,為了充分利用計算機中的所有資源盡可能快地下載下傳資料,我們需要将下載下傳分發到多個程序和線程中。
3.1多線程爬蟲
我們可以修改第一篇文章連結爬蟲隊列結構的代碼,修改為多個線程中啟動爬蟲循環
process_queue()
,以便并發下載下傳這些連結。
import time
import threading
import urlparse
from downloader import Downloader
SLEEP_TIME =
def threaded_crawler(seed_url, delay=, cache=None, scrape_callback=None, user_agent='Wu_Being', proxies=None, num_retries=, max_threads=, timeout=):
"""Crawl this website in multiple threads
"""
# the queue of URL's that still need to be crawled
#crawl_queue = Queue.deque([seed_url])
crawl_queue = [seed_url]
# the URL's that have been seen
seen = set([seed_url])
D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)
def process_queue():
while True:
try:
url = crawl_queue.pop()
except IndexError:
# crawl queue is empty
break
else:
html = D(url)
if scrape_callback:
try:
links = scrape_callback(url, html) or []
except Exception as e:
print 'Error in callback for: {}: {}'.format(url, e)
else:
for link in links:
link = normalize(seed_url, link)
# check whether already crawled this link
if link not in seen:
seen.add(link)
# add this new link to queue
crawl_queue.append(link)
# wait for all download threads to finish
threads = []
while threads or crawl_queue:
# the crawl is still active
for thread in threads:
if not thread.is_alive():
# remove the stopped threads
threads.remove(thread)
while len(threads) < max_threads and crawl_queue:
# can start some more threads
thread = threading.Thread(target=process_queue)
thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
thread.start()
threads.append(thread)
# all threads have been processed
# sleep temporarily so CPU can focus execution on other threads
time.sleep(SLEEP_TIME)
def normalize(seed_url, link):
"""Normalize this URL by removing hash and adding domain
"""
link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
return urlparse.urljoin(seed_url, link)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
上面代碼在循環會不斷建立線程,直到達到線程池
threads
的最大值。在爬取過程中,如果目前列隊沒有更多可以爬取的URL時,該線程會提前停止。
例如目前有兩個線程以及兩個待下載下傳的URL,當第一個線程完成下載下傳時,待爬取隊列為空,則該線程退出。第二個線程稍後也完成了下載下傳,但又發現了另一個待下載下傳的URL。此時,thread循環注意到還有URL需要下載下傳,并且線程數未達到最大值,因些建立一個新的下載下傳線程。
# -*- coding: utf-8 -*-
import sys
from threaded_crawler import threaded_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback
def main(max_threads):
scrape_callback = AlexaCallback()
cache = MongoCache()
#cache.clear()
threaded_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=)
if __name__ == '__main__':
max_threads = int(sys.argv[])
main(max_threads)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
$time python 3threaded_test.py 5
上面使用了5個線程,是以下載下傳速度幾乎是串行版本的5倍。
3.2多程序爬蟲
對于有多核的中央處理器,則可以啟動多程序。
# -*- coding: utf-8 -*-
import sys
from process_crawler import process_crawler
from mongo_cache import MongoCache
from alexa_cb import AlexaCallback
def main(max_threads):
scrape_callback = AlexaCallback()
cache = MongoCache()
cache.clear()
process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=) ##process_crawler
if __name__ == '__main__':
max_threads = int(sys.argv[])
main(max_threads)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
下面代碼首先擷取中央處理器核心個數,然後啟動相應的程序個數,在每程序啟動多個線程爬蟲。之前的爬蟲隊列是存儲在本地記憶體中,其他程序都無法處理這一爬蟲,為了解決這一問題,需要把爬蟲隊列轉移到MongoDB當中。單獨存儲隊列,意味着即使是不同伺服器上的爬蟲也能夠協同處理同一個爬蟲任務。我們可以使用更加健壯的隊列,比如專用的消息傳輸工具
Celery
,這裡我們利用MongoDB實作的隊列代碼。在
threaded_crawler
需要做如下修改:
- 内建的隊列換成基于MongoDB的新隊列
MongoQueue
;
- 由于隊列内部實作中處理重複URL的問題,是以不再需要seen變量;
- 在URL處理結束後調用
complete()
方法,用于記錄該URL已經被成功解析。
import time
import urlparse
import threading
import multiprocessing
from mongo_cache import MongoCache
from mongo_queue import MongoQueue
from downloader import Downloader
SLEEP_TIME =
### process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10)
def process_crawler(args, **kwargs): #args:number of args, kwargs:args list
num_cpus = multiprocessing.cpu_count()
#pool = multiprocessing.Pool(processes=num_cpus)
print 'Starting {} processes...'.format(num_cpus) ######################
processes = []
for i in range(num_cpus):
p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)### threaded_crawler
#parsed = pool.apply_async(threaded_link_crawler, args, kwargs)
p.start()
processes.append(p)
# wait for processes to complete
for p in processes:
p.join()
def threaded_crawler(seed_url, delay=, cache=None, scrape_callback=None, user_agent='wu_being', proxies=None, num_retries=, max_threads=, timeout=):
"""Crawl using multiple threads
"""
# the queue of URL's that still need to be crawled
crawl_queue = MongoQueue() ######################
crawl_queue.clear() ######################
crawl_queue.push(seed_url) ######################
D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)
def process_queue():
while True:
# keep track that are processing url
try:
url = crawl_queue.pop() ######################
except KeyError:
# currently no urls to process
break
else:
html = D(url)
if scrape_callback:
try:
links = scrape_callback(url, html) or []
except Exception as e:
print 'Error in callback for: {}: {}'.format(url, e)
else:
for link in links: #############
# add this new link to queue######################
crawl_queue.push(normalize(seed_url, link))######################
crawl_queue.complete(url) ######################
# wait for all download threads to finish
threads = []
while threads or crawl_queue: ######################
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
while len(threads) < max_threads and crawl_queue.peek(): #######################
# can start some more threads
thread = threading.Thread(target=process_queue)
thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
thread.start()
threads.append(thread)
time.sleep(SLEEP_TIME)
def normalize(seed_url, link):
"""Normalize this URL by removing hash and adding domain
"""
link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
return urlparse.urljoin(seed_url, link)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
在
MongoQueue
定義了三種狀态:
-
OUTSTANDING
:添加一人新URL時;
-
PROCESSING
:隊列中取出準備下載下傳時;
-
COMPLETE
:完成下載下傳時。
由于大部分線程都在從隊列準備取出未完成處理的URL,比如處理的URL線程被終止的情況。是以在該類中使用了
timeout
參數,預設為300秒。在
repaire()
方法中,如果某個URL的處理時間超過了這個timeout值,我們就認定處理過程出現了錯誤,URL的狀态将被重新設為
OUTSTANDING
,以便再次處理。
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
class MongoQueue:
"""
>>> timeout = 1
>>> url = 'http://example.webscraping.com'
>>> q = MongoQueue(timeout=timeout)
>>> q.clear() # ensure empty queue
>>> q.push(url) # add test URL
>>> q.peek() == q.pop() == url # pop back this URL
True
>>> q.repair() # immediate repair will do nothin
>>> q.pop() # another pop should be empty
>>> q.peek()
>>> import time; time.sleep(timeout) # wait for timeout
>>> q.repair() # now repair will release URL
Released: test
>>> q.pop() == url # pop URL again
True
>>> bool(q) # queue is still active while outstanding
True
>>> q.complete(url) # complete this URL
>>> bool(q) # queue is not complete
False
"""
# possible states of a download
OUTSTANDING, PROCESSING, COMPLETE = range()
def __init__(self, client=None, timeout=):
"""
host: the host to connect to MongoDB
port: the port to connect to MongoDB
timeout: the number of seconds to allow for a timeout
"""
self.client = MongoClient() if client is None else client
self.db = self.client.cache
self.timeout = timeout
def __nonzero__(self):
"""Returns True if there are more jobs to process
"""
record = self.db.crawl_queue.find_one(
{'status': {'$ne': self.COMPLETE}}
)
return True if record else False
def push(self, url):
"""Add new URL to queue if does not exist
"""
try:
self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING})
except errors.DuplicateKeyError as e:
pass # this is already in the queue
def pop(self):
"""Get an outstanding URL from the queue and set its status to processing.
If the queue is empty a KeyError exception is raised.
"""
record = self.db.crawl_queue.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
)
if record:
return record['_id']
else:
self.repair()
raise KeyError()
def peek(self):
record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']
def complete(self, url):
self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
def repair(self):
"""Release stalled jobs
"""
record = self.db.crawl_queue.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
print 'Released:', record['_id']
def clear(self):
self.db.crawl_queue.drop()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
4性能對比
腳本 | 線程數 | 程序數 | 時間 | 與串行時間比 |
---|---|---|---|---|
串行 | 1 | 1 | ||
多線程 | 5 | 1 | ||
多線程 | 10 | 1 | ||
多線程 | 20 | 1 | ||
多程序 | 5 | 2 | ||
多程序 | 10 | 2 | ||
多程序 | 20 | 2 |
此外,下載下傳的帶寬是有限的,最終添加新線程将無法加快的下載下傳速度。是以要想獲得更好性能的爬蟲,就需要在多台伺服器上分布式部署爬蟲,并且所有伺服器都要指向同一個MongoDB隊列執行個體中。
Wu_Being 部落格聲明:本人部落格歡迎轉載,請标明部落格原文和原連結!謝謝!
【Python爬蟲系列】《【Python爬蟲4】并發并行下載下傳》http://blog.csdn.net/u014134180/article/details/55506994
Python爬蟲系列的GitHub代碼檔案:https://github.com/1040003585/WebScrapingWithPython