天天看點

python多線程爬蟲之多線程之間的通信——隊列之生産與消費模式

在上上篇文章講多線程的時候附上了一則爬取騰訊招聘的多線程爬蟲,也用到了隊列,剛剛我做了寫改動,就再寫一篇文章把代碼貼出來吧:

class Tencent(Thread):     #生産者線程,專門發送請求,然後将請求結果放入隊列
    
    def __init__(self,queue,t):
        self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'}
        self.queue = queue   #傳入任務隊列
        self.t = t
        super(Tencent, self).__init__()

    def run(self):
        while True:
            if self.queue.empty():   #如果任務隊列為空,則結束循環
                break
            print(f'這是{self.t}線程')
            try:
                q = self.queue.get(block=False)  #當從隊列中無法取到值,則報錯,不會阻塞,這是為了防止多線程中有漏網之魚,比如兩個線程幾乎同時判斷隊列中還有一個任務,于是二人都通過了驗證
                url = 'https://careers.tencent.com/tencentcareer/api/post/Query?keyword=python&pageIndex=%s&pageSize=10&language=zh-cn&area=cn'%q
                response = self.get_response(url)
                response_queue.put(response)
            except Exception as e:
                pass
            print(f'{self.t}線程結束了')

    def get_response(self,url):    #請求的方法
        response = requests.get(url=url, headers=self.headers)
        response = response.content.decode()
        return response






class GetTencent(Thread):  #消費者線程,從隊列中擷取請求,然後進行資料處理并存入檔案
    def __init__(self,t2,lock): 
        self.t2 = t2
        self.lock = lock  #加了把鎖,用于存入檔案時使用,防止資料存儲混亂
        super().__init__()

    def run(self):
        while True:
            if not response_queue.empty(): #判斷是否隊列中還有請求,有則進行提取請求
                print(f'這是{self.t2}線程')
                try:
                    response = response_queue.get(block=False) #與上方做法同理
                    self.load_str(response)
                except Exception as e:
                    pass
                print(f'{self.t2}線程結束了')
            elif response_queue.empty() and not status: #判斷是否生産的線程全部死亡,并且隊列内無請求對象了,如果是,則結束循環,結束消費
                break
    def load_str(self, response):  #資料處理的方法
        response = response.replace(r'\n', '')
        response = response.replace(r'\r', '')
        response = json.loads(response)
        data_list = response['Data']['Posts']
        for data in data_list:
            RecruitPostName = data['RecruitPostName']
            CountryName = data['CountryName']
            LocationName = data['LocationName']
            Responsibility = data['Responsibility']
            LastUpdateTime = data['LastUpdateTime']
            with self.lock:
                with open('tencent.txt', 'a', encoding='utf-8') as f:
                    f.write(
                        RecruitPostName + ',' + CountryName + ',' + LocationName + ',' + Responsibility + ',' + LastUpdateTime + '\n')


response_queue = Queue()  #申明一個全局的隊列用于存放請求
status = True  #實作申明一個生産的線程的狀态,如果結束,則調為False
if __name__ == '__main__':
    q = Queue()  #往隊列中放任務,讓生産者線程去取
    for i in range(1,11):
        q.put(i)

    t_list = ['q1','q2','q3']
    t2_list = ['c1','c2','c3']
    list1 = []
    list2 = []
    start_time = time.time()
    print(f'開始時間{start_time}')
    for t in t_list:  #啟動三個生産者線程
        tencent = Tencent(q,t)
        tencent.start()
        list1.append(tencent)

    lock = Lock()  #執行個體化一把鎖
    for t2 in t2_list:  #啟用三個消費者線程
        get_tencent = GetTencent(t2,lock)
        get_tencent.start()
        list2.append(get_tencent)



    for j in list1:  #用主線程檢測是否生産者線程全部死亡
        j.join()
    status = False  #執行到這一步證明生産者線程都死亡了,改變狀态
    for j in list2:  #主線程阻塞,直到所有請求都被消費者線程處理完畢
        j.join()
    print(f'耗費時間:{time.time()-start_time}')  #列印總耗時
           

看過我之前那篇文章的朋友應該有疑問,為什麼要改成這樣寫,沒有為什麼,隻為了友善你加深隊列與生産消費關系的了解

ok,看下列印結果

開始時間1567078647.517759
這是q1線程
這是q2線程
這是q3線程
q3線程結束了這是c3線程
這是c2線程
這是c1線程

c3線程結束了
c1線程結束了
這是q3線程
q2線程結束了
這是q2線程
這是c3線程
q1線程結束了
這是q1線程
這是c1線程
c3線程結束了
c2線程結束了
c1線程結束了
q3線程結束了這是c2線程這是c3線程


這是q3線程這是c1線程

c2線程結束了c3線程結束了

q2線程結束了
這是c2線程
這是q2線程
q1線程結束了這是c3線程
這是q1線程

c2線程結束了
c1線程結束了
c3線程結束了
q3線程結束了這是c3線程
這是c2線程
這是q3線程這是c1線程


c1線程結束了
c3線程結束了
c2線程結束了
q2線程結束了這是c1線程

這是c3線程這是c2線程

c2線程結束了c3線程結束了

q1線程結束了這是c3線程
這是c2線程

c2線程結束了
q3線程結束了
這是c2線程
c3線程結束了
c1線程結束了
c2線程結束了
耗費時間:1.2140693664550781
           

嗯嗯,差不多了,就先分享到這裡了