本系列為自己學習爬蟲的相關筆記,如有誤,歡迎大家指正
要學習提升爬蟲速度用到的知識,必須先熟悉并發和并行、同步和異步的概
一、并發和并行,同步和異步
并發和并行
并發(concurrency)和并行(parallelism)是兩個相似的概念。并發是指在一個時間段内發生若幹事件的情況,并行是指在同一時刻發生若幹事件的情況。
使用單核CPU和多核CPU來說就是:在使用單核CPU時,多個工作任務是以并發的方式運作的,因為隻有一個CPU,是以各個任務會分别占用CPU的一段時間依次執行。如果在自己分得的時間段沒有完成任務,就會切換到另一個任務,然後在下一次得到CPU使用權的時候再繼續執行,直到完成。在這種情況下,因為各個任務的時間段很短、經常切換,是以給我們的感覺是“同時”進行。在使用多核CPU時,在各個核的任務能夠同時運作,這是真正的同時運作,也就是并行。
同步和異步
同步就是并發或并行的各個任務不是獨自運作的,任務之間有一定的交替順序,可能在運作完一個任務得到結果後,另一個任務才會開始運作。就像接力賽跑一樣,要拿到交接棒之後下一個選手才可以開始跑。
異步則是并發或并行的各個任務可以獨立運作,一個任務的運作不受另一個任務影響,任務之間就像比賽的各個選手在不同的賽道比賽一樣,跑步的速度不受其他賽道選手的影響。
二、多線程爬蟲
多線程爬蟲是以并發的方式執行的。也就是說,多個線程并不能真正的同時執行,而是通過程序的快速切換加快網絡爬蟲速度的。
在Python設計之初,為了資料安全所做的決定設定有GIL(Global Interpreter Lock,全局解釋器鎖)。在Python中,一個線程的執行過程包括擷取GIL、執行代碼直到挂起和釋放GIL。在一個Python程序中,隻有一個GIL,拿不到GIL的線程就不允許進入CPU執行。
正因為如此,在多核CPU上Python的多線程效率也不高。因為每次釋放GIL鎖,線程之間都會進行鎖競争,而切換線程會消耗資源。
雖然如此,但是因為網絡爬蟲是IO密集型,線程能夠有效地提升效率,因為單線程下有IO操作會進行IO等待,是以會造成不必要的時間浪費,而開啟多線程能線上程A等待時自動切換到線程B,可以不浪費CPU的資源,進而提升程式執行的效率。
Python的多線程對于IO密集型代碼比較友好
使用單線程爬蟲
import requests
import time
link_list = []
with open('most.txt') as file:
file_list = file.readlines()
for each in file_list:
link = each.split()[1]
link = link.replace('\n','')
link_list.append(link)
start = time.time()
for eachone in link_list:
try:
r = requests.get(eachone)
print(r.status_code,eachone)
except Exception as e:
print('Error:',e)
end = time.time()
print('串行時間:',end-start)
使用多線程
在python中使用多線程有兩種方法
1.函數式
調用_thread子產品中的start_new_thread()函數産生新線程
簡單示例
import _thread
import time
# 為線程定義一個函數
def print_time(threadName,delay):
count = 0
while count < 3:
time.sleep(delay)
count += 1
print(threadName,time.ctime())
_thread.start_new_thread(print_time,('Thread-1',1))
_thread.start_new_thread(print_time,('Thread-2',2))
#time.sleep(5)
print('Main Finished')
這個代碼沒出現自己想要的結果,暫時沒看出來問題在哪。
- _thread中使用start_new_thread ()函數來産生新線程
- function表示線程函數
- args為傳遞給線程函數的參數,它必須是tuple類型
2.類包裝式
調用Threading庫建立線程,從threading.Thread繼承
threading子產品提供了Thread類來處理線程,包括以下方法。
- run():用以表示線程活動的方法。
- start():啟動線程活動。
- join([time]):等待至線程中止。阻塞調用線程直至線程的join()方法被調用為止。
- isAlive():傳回線程是否是活動的。
- getName():傳回線程名。
- setName():設定線程名。
示例:
import threading
import time
class myThread(threading.Thread):
def __init__(self,name,delay):
threading.Thread.__init__(self)
self.name = name
self.delay = delay
def run(self):
print('Starting:'+self.name)
print_time(self.name,self.delay)
print('Exiting:'+self.name)
def print_time(threadName,delay):
counter = 0;
while counter<3:
time.sleep(delay)
print(threadName,time.ctime())
counter += 1
threads = []
# 建立新線程
thread1 = myThread('Thread-1',1)
thread2 = myThread('Thread-2',2)
# 開啟新線程
thread1.start()
thread2.start()
# 添加線程到線程清單
threads.append(thread1)
threads.append(thread2)
for t in threads:
t.join()
print('Exiting Main Thread')
2.1簡單的多線程爬蟲
import threading
import requests
import time
link_list = []
with open('most.txt','r') as file:
file_list = file.readlines();
for eachone in file_list:
link = eachone.split()[1]
link = link.replace('\n','')
link_list.append(link)
start = time.time()
class myThread(threading.Thread):
def __init__(self,name,link_range):
threading.Thread.__init__(self)
self.name = name
self.link_range = link_range
def run(self):
print('starting '+self.name)
crawler(self.name,self.link_range)
print('exiting '+self.name)
def crawler(threadName,link_range):
for i in range(link_range[0],link_range[1]+1):
try:
r = requests.get(link_list[i],timeout=20)
print(threadName,r.status_code,link_list[i])
except Exception as e:
print(threadName,'Error: ' ,e)
link_range_list = [(0,200),(201,400),(401,600),(601,800),(801,1000)]
thread_list = []
# 建立新線程
for i in range(1,6):
thread = myThread('Thread-'+str(i),link_range_list[i-1])
thread.start()
thread_list.append(thread)
# 等待所有線程完成
for thread in thread_list:
thread.join() #thread.join()方法等待各個線程執行完畢
end = time.time()
print('簡單多線程爬蟲的總時間為:',end-start)
print('Exiting Main Thread')
這個代碼還是有改進的餘地,比如說某一個線程中的200個已經結束了,那麼就還剩4個線程。到最後就可能變成單線程了。
2.2 queue多線程爬蟲
import threading
import time
import requests
import queue as Queue
link_list = []
with open('most.txt','r') as file:
file_list = file.readlines()
for eachone in file_list:
link = eachone.split()[1]
link = link.replace('\n','')
link_list.append(link)
start = time.time()
class myThread(threading.Thread):
def __init__(self,threadName,q):
threading.Thread.__init__(self)
self.name = threadName
self.q=q
def run(self):
print('starting '+self.name)
while True:
try:
crawler(self.name,self.q)
except:
break
print('exit '+self.name)
def crawler(threadName,q):
url = q.get(timeout=2)
try:
r = requests.get(url,timeout=20)
print(q.qsize(),threadName,r.status_code,url)
except Exception as e:
print(q.qsize(), threadName, url, 'Error:',e)
thread_list = ['Thread-1','Thread-2','Thread-3','Thread-4','Thread-5']
workQueue = Queue.Queue(1000)
threads = []
# 建立新線程
for tName in thread_list:
thread = myThread(tName,workQueue)
thread.start()
threads.append(thread)
#填充隊列
for url in link_list:
workQueue.put(url)
# 等待所有線程完成
for t in threads:
t.join()
end = time.time()
print('Queue多線程爬蟲的總時間:'+end-start)
print('Exiting Main Thread')
Python的多線程爬蟲隻能運作在單核上,各個線程以并發的方法異步運作。由于GIL(Global Interpreter Lock,全局解釋器鎖)的存在,多線程爬蟲并不能充分地發揮多核CPU的資源。
三、多程序爬蟲
多程序爬蟲則可以利用CPU的多核,程序數取決于計算機CPU的處理器個數。由于運作在不同的核上,各個程序的運作是并行的。在Python中,如果我們要用多程序,就需要用到multiprocessing這個庫。
使用multiprocess庫的兩種方法
當程序數量大于CPU的核心數量時,等待運作的程序會等到其他程序運作完畢讓出核心為止。是以,如果CPU是單核,就無法進行多程序并行。在使用多程序爬蟲之前,我們需要先了解計算機CPU的核心數量
from multiprocessing import cpu_count
print(cpu_count())
1.使用Process + Queue的多程序爬蟲
from multiprocessing import cpu_count
from multiprocessing import Process,Queue
import time
import requests
link_list = []
print(cpu_count())
with open('most.txt','r') as file:
file_list = file.readlines()
for eachfile in file_list:
link = eachfile.split()[1]
link = link.replace('\n','')
link_list.append(link)
start = time.time()
class MyProcess(Process):
def __init__(self,q):
Process.__init__(self)
self.q = q
def run(self):
print('starting:' ,self.pid)
while not self.q.empty():
crawler(self.q)
print('exit:',self.pid)
def crawler(q):
url = q.get(timeout = 2)
try:
r = requests.get(url,timeout=20)
print(q.qsize(),r.status_code,url)
except Exception as e:
print(q.qsize(),url,'Error:',e)
if __name__ =='__main__':
ProcessNames = ['Process-1','Process-2','Process-3','Process-4','Process-5']
workQueue = Queue(1000)
# 填充隊列
for url in link_list:
workQueue.put(url)
for i in range(0,3):
p = MyProcess(workQueue)
p.daemon = True #如果将daemon設定為True,當父程序結束後,子程序就會自動被終止。
p.start()
p.join()
end = time.time()
print('Process + Queue多程序爬蟲的總時間為:',end-start)
print('Main process Ended!')
2.使用Pool + Queue的多程序爬蟲
from multiprocessing import Pool,Manager
import time
import requests
link_list = []
with open('most.txt','r') as file:
file_list = file.readlines()
for each in file_list:
link = each.split()[1].replace('\n','')
link_list.append(link)
start = time.time()
def crawler(q,index):
Process_id = 'Process-'+str(index)
while not q.empty():
url = q.get(timeout=2)
try:
r = requests.get(url,timeout=20)
print(Process_id,q.qsize(),r.status_code,url)
except Exception as e:
print(Process_id,q.qsize(),url,'Error:',e)
if __name__ == '__main__':
manager = Manager()
workQueue = manager.Queue(1000)
# 填充隊列
for url in link_list:
workQueue.put(url)
pool = Pool(processes=3) #使用Pool(processes=3)建立線程池的最大值為3
for i in range(4):
pool.apply_async(crawler,args=(workQueue,i)) # 建立子程序 這裡采用的是非阻塞方法
print('Start process')
pool.close()
pool.join()
end = time.time()
print('Pool +Queue多程序爬蟲的總時間為:',end-start)
print('Main process End')
四、多協程(Coroutine)爬蟲
使用協程的好處:
- 協程像一種在程式級别模拟系統級别的程序,由于是單線程,并且少了上下文切換,是以相對來說系統消耗很少
- 協程友善切換控制流,這就簡化了程式設計模型。協程能保留上一次調用時的狀态(所有局部狀态的一個特定組合),每次過程重入時,就相當于進入了上一次調用的狀态。
- 協程的高擴充性和高并發性,一個CPU支援上萬協程都不是問題,是以很适合用于高并發處理。
協程的缺點:
- 協程的本質是一個單線程,不能同時使用單個CPU的多核,需要和程序配合才能運作在多CPU上
- 有長時間阻塞的IO操作時不要用協程,因為可能會阻塞整個程式
在python的協程中可以使用gevent庫
pip install gevent
import gevent
from gevent.queue import Queue,Empty
import time
#把下面有可能有IO操作的單獨坐上标記
"""
以下兩行,可以實作爬蟲的并發能力,如果沒有這兩句的話,整個抓取過程就會變成
依次抓取gevent庫中的monkey能把可能有IO操作的單獨做上标記,将IO變成可以異步執行的函數
"""
from gevent import monkey
monkey.patch_all()#将IO轉為異步執行的函數
import requests
link_list = []
with open('most.txt','r') as file:
file_list = file.readlines()
for each in file_list:
link_list.append(each.split()[1].replace('\n',''))
start = time.time()
def crawler(index):
Process_id = 'Process-'+str(index)
while not workQueue.empty():
url = workQueue.get(timeout=2)
try:
r = requests.get(url,timeout=20)
print(Process_id,workQueue.qsize(),r.status_code,url)
except Exception as e:
print(Process_id, workQueue.qsize(), url,'Error',e)
def boss():
for url in link_list:
workQueue.put_nowait(url)
if __name__ =='__main__':
workQueue = Queue(1000)
gevent.spawn(boss).join() # 将隊列中加入的内容整合到gevent中
'''
下面4行是建立多協程爬蟲的程式
'''
jobs = []
for i in range(10): #建立10個協程
jobs.append(gevent.spawn(crawler,i))
gevent.joinall(jobs)
end = time.time()
print('多協程爬蟲的總時間為:',end-start)
print('Main Ended')
五、總結
- 并發(concurrency)和并行(parallelism):并發是指在一個時間段發生若幹事件的情況。并行是指在同一時刻發生若幹事件的情況
- 同步是指并發或并行的各個任務不是獨自運作的,任務之間有一定的交替順序,可能在執行完一個任務并得到結果後,另一個任務才會開始運作。
- 異步則是并發或并行的各個任務可以獨立運作,一個任務的運作不受另一個影響。
- 多線程的方式: 程式的執行是在不同線程之間切換的。當一個線程等待網頁下載下傳時,程序可以切換到其他線程執行。
【爬蟲】使用多線程、多程式、多協程提升爬蟲速度 - 多程序的方式 程式的執行是并行、異步的,多個線程可以在同一時刻發生若幹事件。
【爬蟲】使用多線程、多程式、多協程提升爬蟲速度 - 多協程的執行方式 協程是一種使用者态的輕量級線程,在程式級别來模拟系統級别用的程序。在一個程序中,一個線程通過程式的模拟方法實作高并發。
【爬蟲】使用多線程、多程式、多協程提升爬蟲速度 微信搜一搜【梓莘】或掃描下方二維碼交個朋友共同進步。文章持續更新中。目前在整理爬蟲相關學習筆記,期待後續更多的更新
【爬蟲】使用多線程、多程式、多協程提升爬蟲速度