最近仿照書上寫了個多線程爬蟲架構,在實作多程序的時候遇到了困難,不過打算開始學scrapy了也就暫時不管多程序的問題了
首先是緩存部分,在每次下載下傳一個html的時候,首先會查詢mongodb資料庫中是否已經有該頁面的緩存,如果沒有,下載下傳頁面,如果有,獲得緩存的頁面
在mongodb中設定一個特殊索引用于删除逾時的緩存(緩存預設儲存30天),由于該類實作了 __getitem__和__setitem__方法,是以可以直接像操作字典一樣操作這個對象
import pickle
import zlib
from datetime import datetime,timedelta
from pymongo import MongoClient
from bson.binary import Binary
class MongoCache:
def __init__(self,client = None,expires = timedelta(days=30)):
#如果沒有傳遞MongoClient對象,建立一個預設對象
if client is None:
self.client = MongoClient("localhost",12345)
#建立一個連接配接想資料庫中緩存資料
self.db = self.client.cache
self.db.webpage.create_index("timestamp",expireAfterSeconds=expires.total_seconds())
def __getitem__(self, url):
'''
從資料庫中獲得該url的值
'''
record = self.db.webpage.find_one({"_id":url})
print(record)
if record:
return pickle.loads(zlib.decompress(record["result"]))
#return record["result"]
else:
raise KeyError(url+"不存在")
def __setitem__(self, url,result):
'''
将資料儲存到資料庫中
'''
record = {"result":Binary(zlib.compress(pickle.dumps(result))),
"timestamp":datetime.utcnow()}
#record = {"result":result,"timestamp":datetime.utcnow()}
self.db.webpage.update({"_id":url},{"$set":record},upsert=True)
然後是實作下載下傳的類,該類首先檢視緩存,如果緩存中已有html并且響應碼正常,則直接從緩存中擷取html,否則下載下傳頁面
Throttle類實作了下載下傳之間的延時功能
import urllib.request
import time
import datetime
import re
import socket
import random
from DiskCache import DiskCache
DEFAULT_AGENT = "wswp"
DEFAULT_DELAY = 5
DEFAULT_RETRIES = 1
DEFAULT_TIMEOUT = 60
class Downloader:
'''
用于下載下傳html的類,可以傳入的參數有
proxies;代理ip清單,會随機的在清單中抽取代理ip進行下載下傳
delay:下載下傳同一域名的等待時間,預設一秒
user_agent:主機名,預設python
num_retries:下載下傳失敗重新下載下傳次數
timeout;下載下傳逾時時間
cache:緩存方式
'''
def __init__(self,proxies=None,delay = DEFAULT_DELAY,user_agent = DEFAULT_AGENT,num_retries = DEFAULT_RETRIES,timeout = DEFAULT_TIMEOUT,opener = None,cache=None):
#設定逾時時間
socket.setdefaulttimeout(timeout)
self.throttle = Throttle(delay)
self.user_agent = user_agent
self.proxies = proxies
self.num_retries = num_retries
self.opener = opener
self.cache = cache
def __call__(self,url):
'''
帶有緩存功能的下載下傳方法,通過類對象可以直接調用
'''
print(self.user_agent)
print("開始下載下傳"+url)
result = None
if self.cache:
try:
#從緩存中擷取url對應的資料
result = self.cache[url]
print("測試代碼4")
except KeyError:
#如果獲得KeyError異常,跳過
pass
else:
#如果是未成功下載下傳的網頁,重新下載下傳
if result["code"]:
if self.num_retries > 0 and 500<result["code"]<600:
result = None
# 如果頁面不存在,下載下傳該頁面
if result is None:
#延遲默時間
self.throttle.wait(url)
if self.proxies:
#如果有代理IP,從代理IP清單中随機抽取一個代理IP
proxy = random.choice(self.proxies)
else:
proxy = None
#構造請求頭
headers = {"User-agent":self.user_agent}
#下載下傳頁面
result = self.download(url,headers,proxy = proxy,num_retries = self.num_retries)
'''
file = open("f:\\bilibili.html","wb")
file.write(result["html"])
file.close()
'''
if self.cache:
#如果有緩存方式,緩存網頁
self.cache[url] = result
print(url,"頁面下載下傳完成")
return result["html"]
def download(self,url,headers,proxy,num_retries,data=None):
'''
用于下載下傳一個頁面,傳回頁面和與之對應的狀态碼
'''
#建構請求
request = urllib.request.Request(url,data,headers or {})
request.add_header("Cookie","finger=7360d3c2; UM_distinctid=15c59703db998-0f42b4b61afaa1-5393662-100200-15c59703dbcc1d; pgv_pvi=653650944; fts=1496149148; sid=bgsv74pg; buvid3=56812A21-4322-4C70-BF18-E6D646EA78694004infoc; CNZZDATA2724999=cnzz_eid%3D214248390-1496147515-https%253A%252F%252Fwww.baidu.com%252F%26ntime%3D1496805293")
request.add_header("Upgrade-Insecure-Requests","1")
opener = self.opener or urllib.request.build_opener()
if proxy:
#如果有代理IP,使用代理IP
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxy))
try:
#下載下傳網頁
response = opener.open(request)
print("code是",response.code)
html = response.read().decode()
code = response.code
except Exception as e:
print("下載下傳出現錯誤",str(e))
html = ''
if hasattr(e,"code"):
code =e.code
if num_retries > 0 and 500<code<600:
#如果錯誤不是未找到網頁,則重新下載下傳num_retries次
return self.download(url,headers,proxy,num_retries-1,data)
else:
code = None
print(html)
return {"html":html,"code":code}
class Throttle:
'''
按照延時,請求,代理IP等下載下傳網頁,處理網頁中的link的類
'''
def __init__(self, delay):
self.delay = delay
self.domains = {}
def wait(self, url):
'''
每下載下傳一個html之間暫停的時間
'''
# 獲得域名
domain = urllib.parse.urlparse(url).netloc
# 獲得上次通路此域名的時間
las_accessed = self.domains.get(domain)
if self.delay > 0 and las_accessed is not None:
# 計算需要強制暫停的時間 = 要求的間隔時間 - (現在的時間 - 上次通路的時間)
sleep_secs = self.delay - (datetime.datetime.now() - las_accessed).seconds
if sleep_secs > 0:
time.sleep(sleep_secs)
# 存儲此次通路域名的時間
self.domains[domain] = datetime.datetime.now()
然後是實作爬蟲功能的類
import time
import threading
import re
import urllib.parse
import datetime
from bs4 import BeautifulSoup
from Downloader import Downloader
from MongoCache import MongoCache
SLEEP_TIME = 1
def get_links(html):
'''
獲得一個頁面上的所有連結
'''
bs = BeautifulSoup(html, "lxml")
link_labels = bs.find_all("a")
# for link in link_labels:
return [link_label.get('href', "default") for link_label in link_labels]
def same_domain(url1, url2):
'''
判斷域名書否相同
'''
return urllib.parse.urlparse(url1).netloc == urllib.parse.urlparse(url2).netloc
def normalize(seed_url, link):
'''
用于将絕對路徑轉換為相對路徑
'''
link, no_need = urllib.parse.urldefrag(link)
return urllib.parse.urljoin(seed_url, link)
def threader_crawler(seed_url,resource_regiex=None,link_regiex = ".*",delay=5,cache=None,download_source_callback=None,user_agent="wswp",proxies=None, num_retries=1, max_threads=10, timeout=60,max_url=500):
downloaded = []
crawl_queue = [seed_url]
seen = set([seed_url])
D = Downloader(cache = cache,delay = delay,user_agent=user_agent,proxies=proxies,num_retries=num_retries,timeout=timeout)
print(user_agent)
def process_queue():
while True:
links = []
try:
url = crawl_queue.pop()
except IndexError:
break
else:
html = D(url)
downloaded.append(url)
if download_source_callback:
if resource_regiex and re.match(resource_regiex,url):
download_source_callback(url,html)
links.extend([link for link in get_links(html) if re.match(link_regiex,link)])
for link in links:
link = normalize(seed_url, link)
if link not in seen:
seen.add(link)
if same_domain(seed_url,link):
crawl_queue.append(link)
print("已經發現的總網頁數目為",len(seen))
print("已經下載下傳過的網頁數目為",len(downloaded))
print("還沒有周遊過的網頁數目為",len(crawl_queue))
threads=[]
while threads or crawl_queue:
if len(downloaded) == max_url:
return
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
while len(threads) < max_threads and crawl_queue:
print("線程數量為", len(threads))
thread = threading.Thread(target=process_queue)
thread.setDaemon(True)
thread.start()
print("線程數量為", len(threads))
threads.append(thread)
def main():
starttime = datetime.datetime.now()
threader_crawler("http://www.xicidaili.com/",max_threads=1,max_url=10,user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36")
endtime = datetime.datetime.now()
print("花費時間",(endtime-starttime).total_seconds())
if __name__ == "__main__":
main()
經過測試,多線程爬蟲速度要遠遠高于單個線程爬取,簡單測試結果如下
開啟30個線程爬取一百個網站用時31秒,平均一個用時0.31秒
開啟10個線程爬取一百個網頁用時69秒,平均一個用時0.69秒
開啟1 個線程爬取一百個網站用時774秒,平均一個用時7.74秒
順便實作了一個測試用的資源下載下傳類,用于将電影天堂的所有資源頁的電影儲存到資料庫
from lxml import etree
from pymongo import MongoClient
import urllib.request
import re
class download_source_callback:
def __init__(self,client=None):
if client:
self.client = client
else:
self.client = MongoClient("localhost",12345)
self.db = self.client.cache
def __call__(self,url,html):
title_regiex = "<title>(.*?)</title>"
class_regiex = "類 别(.*?)<"
director_regiex = ".*導 演(.*?)<"
content_regiex = "簡 介(.*?)<br /><br />◎"
imdb_regiex = "IMDb評分 (.*?)<"
douban_regiex = "豆瓣評分(.*?)<"
html = html.decode("gbk","ignore")
m = re.search(title_regiex,html)
if m:
title = m.group(1)
else:
title = None
m = re.search(class_regiex,html)
if m:
class_name = m.group(1)
else:
class_name = None
m = re.search(content_regiex,html)
if m:
text = m.group(1).replace("<br />","")
content = text
else:
content = None
m = re.search(douban_regiex,html)
if m:
douban = m.group(1)
else:
douban = None
m = re.search(imdb_regiex,html)
if m:
imdb = m.group(1)
else:
imdb = None
print(title,class_name,content,douban,imdb)
move = {
"name":title,
"class":class_name,
"introduce":content,
"douban":douban,
"imdb":imdb
}
self.db.moves.update({"_id":title},{"$set":move},upsert=True)
print("成功儲存一部電影"+title)
if __name__ == "__main__":
html= open("f:\資源.txt").read()
a = download_source_callback()
a("http://www.dytt8.net/html/gndy/jddy/20170529/54099.html",html)