天天看點

【爬蟲】使用多線程、多程序、多協程提升爬蟲速度

本系列為自己學習爬蟲的相關筆記,如有誤,歡迎大家指正

要學習提升爬蟲速度用到的知識,必須先熟悉并發和并行、同步和異步的概

一、并發和并行,同步和異步

并發和并行

并發(concurrency)和并行(parallelism)是兩個相似的概念。并發是指在一個時間段内發生若幹事件的情況,并行是指在同一時刻發生若幹事件的情況。

使用單核CPU和多核CPU來說就是:在使用單核CPU時,多個工作任務是以并發的方式運作的,因為隻有一個CPU,是以各個任務會分别占用CPU的一段時間依次執行。如果在自己分得的時間段沒有完成任務,就會切換到另一個任務,然後在下一次得到CPU使用權的時候再繼續執行,直到完成。在這種情況下,因為各個任務的時間段很短、經常切換,是以給我們的感覺是“同時”進行。在使用多核CPU時,在各個核的任務能夠同時運作,這是真正的同時運作,也就是并行。

同步和異步

同步就是并發或并行的各個任務不是獨自運作的,任務之間有一定的交替順序,可能在運作完一個任務得到結果後,另一個任務才會開始運作。就像接力賽跑一樣,要拿到交接棒之後下一個選手才可以開始跑。

異步則是并發或并行的各個任務可以獨立運作,一個任務的運作不受另一個任務影響,任務之間就像比賽的各個選手在不同的賽道比賽一樣,跑步的速度不受其他賽道選手的影響。

二、多線程爬蟲

多線程爬蟲是以并發的方式執行的。也就是說,多個線程并不能真正的同時執行,而是通過程序的快速切換加快網絡爬蟲速度的。

在Python設計之初,為了資料安全所做的決定設定有GIL(Global Interpreter Lock,全局解釋器鎖)。在Python中,一個線程的執行過程包括擷取GIL、執行代碼直到挂起和釋放GIL。在一個Python程序中,隻有一個GIL,拿不到GIL的線程就不允許進入CPU執行。

正因為如此,在多核CPU上Python的多線程效率也不高。因為每次釋放GIL鎖,線程之間都會進行鎖競争,而切換線程會消耗資源。

雖然如此,但是因為網絡爬蟲是IO密集型,線程能夠有效地提升效率,因為單線程下有IO操作會進行IO等待,是以會造成不必要的時間浪費,而開啟多線程能線上程A等待時自動切換到線程B,可以不浪費CPU的資源,進而提升程式執行的效率。

Python的多線程對于IO密集型代碼比較友好

使用單線程爬蟲

import requests
import time
link_list = []

with open('most.txt') as file:
    file_list = file.readlines()
    for each in file_list:
        link = each.split()[1]
        link = link.replace('\n','')
        link_list.append(link)
    start = time.time()
    for eachone in link_list:
        try:
            r = requests.get(eachone)
            print(r.status_code,eachone)
        except Exception as e:
            print('Error:',e)
    end = time.time()
    print('串行時間:',end-start)
           

使用多線程

在python中使用多線程有兩種方法

1.函數式

調用_thread子產品中的start_new_thread()函數産生新線程

簡單示例

import _thread
import time
# 為線程定義一個函數
def print_time(threadName,delay):
    count = 0
    while count < 3:
        time.sleep(delay)
        count += 1
        print(threadName,time.ctime())

_thread.start_new_thread(print_time,('Thread-1',1))
_thread.start_new_thread(print_time,('Thread-2',2))

#time.sleep(5)
print('Main Finished')
           

這個代碼沒出現自己想要的結果,暫時沒看出來問題在哪。

  • _thread中使用start_new_thread ()函數來産生新線程
  • function表示線程函數
  • args為傳遞給線程函數的參數,它必須是tuple類型

2.類包裝式

調用Threading庫建立線程,從threading.Thread繼承

threading子產品提供了Thread類來處理線程,包括以下方法。

  • run():用以表示線程活動的方法。
  • start():啟動線程活動。
  • join([time]):等待至線程中止。阻塞調用線程直至線程的join()方法被調用為止。
  • isAlive():傳回線程是否是活動的。
  • getName():傳回線程名。
  • setName():設定線程名。

示例:

import threading
import  time

class myThread(threading.Thread):
    def __init__(self,name,delay):
        threading.Thread.__init__(self)
        self.name = name
        self.delay = delay
    def run(self):
        print('Starting:'+self.name)
        print_time(self.name,self.delay)
        print('Exiting:'+self.name)
def print_time(threadName,delay):
    counter = 0;
    while counter<3:
        time.sleep(delay)
        print(threadName,time.ctime())
        counter += 1

threads = []
# 建立新線程
thread1 = myThread('Thread-1',1)
thread2 = myThread('Thread-2',2)
# 開啟新線程
thread1.start()
thread2.start()

# 添加線程到線程清單
threads.append(thread1)
threads.append(thread2)

for t in threads:
    t.join()
print('Exiting Main Thread')
           

2.1簡單的多線程爬蟲

import threading
import requests
import time

link_list = []

with open('most.txt','r') as file:
    file_list = file.readlines();
    for eachone in file_list:
        link = eachone.split()[1]
        link = link.replace('\n','')
        link_list.append(link)

start = time.time()

class myThread(threading.Thread):
    def __init__(self,name,link_range):
        threading.Thread.__init__(self)
        self.name = name
        self.link_range = link_range

    def run(self):
        print('starting '+self.name)
        crawler(self.name,self.link_range)
        print('exiting '+self.name)

def crawler(threadName,link_range):
    for i in range(link_range[0],link_range[1]+1):
        try:
            r = requests.get(link_list[i],timeout=20)
            print(threadName,r.status_code,link_list[i])
        except Exception as e:
            print(threadName,'Error: ' ,e)

link_range_list = [(0,200),(201,400),(401,600),(601,800),(801,1000)]
thread_list = []

# 建立新線程
for i in range(1,6):
    thread = myThread('Thread-'+str(i),link_range_list[i-1])
    thread.start()
    thread_list.append(thread)

# 等待所有線程完成
for thread in thread_list: 
    thread.join() #thread.join()方法等待各個線程執行完畢

end = time.time()

print('簡單多線程爬蟲的總時間為:',end-start)
print('Exiting Main Thread')
           

這個代碼還是有改進的餘地,比如說某一個線程中的200個已經結束了,那麼就還剩4個線程。到最後就可能變成單線程了。

2.2 queue多線程爬蟲

import threading
import  time
import requests
import queue as Queue


link_list = []
with open('most.txt','r') as file:
    file_list = file.readlines()
    for eachone in file_list:
        link = eachone.split()[1]
        link = link.replace('\n','')
        link_list.append(link)

start = time.time()

class myThread(threading.Thread):
    def __init__(self,threadName,q):
        threading.Thread.__init__(self)
        self.name = threadName
        self.q=q
    def run(self):
        print('starting '+self.name)
        while True:
            try:
                crawler(self.name,self.q)
            except:
                break
        print('exit '+self.name)

def crawler(threadName,q):
    url = q.get(timeout=2)
    try:
        r = requests.get(url,timeout=20)
        print(q.qsize(),threadName,r.status_code,url)
    except Exception as e:
        print(q.qsize(), threadName, url, 'Error:',e)
thread_list = ['Thread-1','Thread-2','Thread-3','Thread-4','Thread-5']
workQueue = Queue.Queue(1000)
threads = []
# 建立新線程
for tName in thread_list:
    thread = myThread(tName,workQueue)
    thread.start()
    threads.append(thread)
#填充隊列
for url in link_list:
    workQueue.put(url)

# 等待所有線程完成
for t in threads:
    t.join()

end = time.time()
print('Queue多線程爬蟲的總時間:'+end-start)
print('Exiting Main Thread')

           

Python的多線程爬蟲隻能運作在單核上,各個線程以并發的方法異步運作。由于GIL(Global Interpreter Lock,全局解釋器鎖)的存在,多線程爬蟲并不能充分地發揮多核CPU的資源。

三、多程序爬蟲

多程序爬蟲則可以利用CPU的多核,程序數取決于計算機CPU的處理器個數。由于運作在不同的核上,各個程序的運作是并行的。在Python中,如果我們要用多程序,就需要用到multiprocessing這個庫。

使用multiprocess庫的兩種方法

當程序數量大于CPU的核心數量時,等待運作的程序會等到其他程序運作完畢讓出核心為止。是以,如果CPU是單核,就無法進行多程序并行。在使用多程序爬蟲之前,我們需要先了解計算機CPU的核心數量

from multiprocessing import cpu_count
print(cpu_count())
           

1.使用Process + Queue的多程序爬蟲

from multiprocessing import cpu_count
from multiprocessing import Process,Queue
import time
import requests
link_list = []
print(cpu_count())

with open('most.txt','r') as file:
    file_list = file.readlines()
    for eachfile in file_list:
        link = eachfile.split()[1]
        link = link.replace('\n','')
        link_list.append(link)

start = time.time()
class MyProcess(Process):
    def __init__(self,q):
        Process.__init__(self)
        self.q = q

    def run(self):
        print('starting:' ,self.pid)
        while not  self.q.empty():
            crawler(self.q)
        print('exit:',self.pid)



def crawler(q):
    url = q.get(timeout = 2)
    try:
        r = requests.get(url,timeout=20)
        print(q.qsize(),r.status_code,url)
    except Exception as e:
        print(q.qsize(),url,'Error:',e)

if __name__ =='__main__':
    ProcessNames = ['Process-1','Process-2','Process-3','Process-4','Process-5']
    workQueue = Queue(1000)
    # 填充隊列
    for url in link_list:
        workQueue.put(url)
    for i in range(0,3):
        p = MyProcess(workQueue)
        p.daemon = True #如果将daemon設定為True,當父程序結束後,子程序就會自動被終止。
        p.start()
        p.join()
    end = time.time()
    print('Process + Queue多程序爬蟲的總時間為:',end-start)

    print('Main process Ended!')
           

2.使用Pool + Queue的多程序爬蟲

from multiprocessing import Pool,Manager
import time
import requests
link_list = []
with open('most.txt','r') as file:
    file_list = file.readlines()
    for each in file_list:
        link = each.split()[1].replace('\n','')
        link_list.append(link)

start = time.time()
def crawler(q,index):
    Process_id = 'Process-'+str(index)
    while not  q.empty():
        url = q.get(timeout=2)
        try:
            r = requests.get(url,timeout=20)
            print(Process_id,q.qsize(),r.status_code,url)
        except Exception as e:
            print(Process_id,q.qsize(),url,'Error:',e)

if __name__ == '__main__':
    manager = Manager()
    workQueue = manager.Queue(1000)
    # 填充隊列
    for url in link_list:
        workQueue.put(url)
    pool = Pool(processes=3) #使用Pool(processes=3)建立線程池的最大值為3
    for i in range(4):
        pool.apply_async(crawler,args=(workQueue,i)) # 建立子程序 這裡采用的是非阻塞方法
    print('Start process')
    pool.close()
    pool.join()
    end = time.time()
    print('Pool +Queue多程序爬蟲的總時間為:',end-start)
    print('Main process End')
           

四、多協程(Coroutine)爬蟲

使用協程的好處:

  • 協程像一種在程式級别模拟系統級别的程序,由于是單線程,并且少了上下文切換,是以相對來說系統消耗很少
  • 協程友善切換控制流,這就簡化了程式設計模型。協程能保留上一次調用時的狀态(所有局部狀态的一個特定組合),每次過程重入時,就相當于進入了上一次調用的狀态。
  • 協程的高擴充性和高并發性,一個CPU支援上萬協程都不是問題,是以很适合用于高并發處理。

協程的缺點:

  • 協程的本質是一個單線程,不能同時使用單個CPU的多核,需要和程序配合才能運作在多CPU上
  • 有長時間阻塞的IO操作時不要用協程,因為可能會阻塞整個程式

在python的協程中可以使用gevent庫

pip install gevent
import gevent

from gevent.queue import Queue,Empty
import time

#把下面有可能有IO操作的單獨坐上标記
"""
以下兩行,可以實作爬蟲的并發能力,如果沒有這兩句的話,整個抓取過程就會變成
依次抓取gevent庫中的monkey能把可能有IO操作的單獨做上标記,将IO變成可以異步執行的函數
"""
from gevent import monkey
monkey.patch_all()#将IO轉為異步執行的函數
import requests
link_list = []

with open('most.txt','r') as file:
    file_list = file.readlines()
    for each in file_list:
        link_list.append(each.split()[1].replace('\n',''))

start = time.time()

def crawler(index):
    Process_id = 'Process-'+str(index)
    while not workQueue.empty():
        url = workQueue.get(timeout=2)
        try:
            r = requests.get(url,timeout=20)
            print(Process_id,workQueue.qsize(),r.status_code,url)
        except Exception as e:
            print(Process_id, workQueue.qsize(),  url,'Error',e)

def boss():
    for url in link_list:
        workQueue.put_nowait(url)
if __name__ =='__main__':
    workQueue = Queue(1000)
    gevent.spawn(boss).join() # 将隊列中加入的内容整合到gevent中
    '''
    下面4行是建立多協程爬蟲的程式
    '''
    jobs = []
    for i in range(10): #建立10個協程
        jobs.append(gevent.spawn(crawler,i))
    gevent.joinall(jobs)
    end = time.time()
    print('多協程爬蟲的總時間為:',end-start)
    print('Main Ended')
           

五、總結

  1. 并發(concurrency)和并行(parallelism):并發是指在一個時間段發生若幹事件的情況。并行是指在同一時刻發生若幹事件的情況
  2. 同步是指并發或并行的各個任務不是獨自運作的,任務之間有一定的交替順序,可能在執行完一個任務并得到結果後,另一個任務才會開始運作。
  3. 異步則是并發或并行的各個任務可以獨立運作,一個任務的運作不受另一個影響。
  4. 多線程的方式:
    【爬蟲】使用多線程、多程式、多協程提升爬蟲速度
    程式的執行是在不同線程之間切換的。當一個線程等待網頁下載下傳時,程序可以切換到其他線程執行。
  5. 多程序的方式
    【爬蟲】使用多線程、多程式、多協程提升爬蟲速度
    程式的執行是并行、異步的,多個線程可以在同一時刻發生若幹事件。
  6. 多協程的執行方式
    【爬蟲】使用多線程、多程式、多協程提升爬蟲速度
    協程是一種使用者态的輕量級線程,在程式級别來模拟系統級别用的程序。在一個程序中,一個線程通過程式的模拟方法實作高并發。
    微信搜一搜【梓莘】或掃描下方二維碼交個朋友共同進步。文章持續更新中。目前在整理爬蟲相關學習筆記,期待後續更多的更新
    【爬蟲】使用多線程、多程式、多協程提升爬蟲速度