天天看點

聊聊并發(十)生産者消費者模式

在并發程式設計中使用生産者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生産線程和消費線程的工作能力來提高程式的整體處理資料的速度。

線上程世界裡,生産者就是生産資料的線程,消費者就是消費資料的線程。在多線程開發當中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者。為了解決這種生産消費能力不均衡的問題,是以便有了生産者和消費者模式。

生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。

這個阻塞隊列就是用來給生産者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模闆模式的第三者是模闆類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。

我和同僚一起利用業餘時間開發的yuna工具中使用了生産者和消費者模式。首先我先介紹下yuna工具,在阿裡巴巴很多同僚都喜歡通過郵件分享技術文章,因為通過郵件分享很友善,同學們在網上看到好的技術文章,複制粘貼發送就完成了分享,但是我們發現技術文章不能沉澱下來,對于新來的同學看不到以前分享的技術文章,大家也很難找到以前分享過的技術文章。為了解決這問題,我們開發了yuna工具。yuna取名自我非常喜歡的一款rpg遊戲”最終幻想”中女主角的名字。

首先我們申請了一個專門用來收集分享郵件的郵箱,比如[email protected],同學将分享的文章發送到這個郵箱,讓同學們每次都抄送到這個郵箱肯定很麻煩,是以我們的做法是将這個郵箱位址放在部門郵件清單裡,是以分享的同學隻需要象以前一樣向整個部門分享文章就行,yuna工具通過讀取郵件伺服器裡該郵箱的郵件,把所有分享的郵件下載下傳下來,包括郵件的附件,圖檔,和郵件回複,我們可能會從這個郵箱裡下載下傳到一些非分享的文章,是以我們要求分享的郵件标題必須帶有一個關鍵字,比如[内貿技術分享],下載下傳完郵件之後,通過confluence的web service接口,把文章插入到confluence裡,這樣新同僚就可以在confluence裡看以前分享過的文章,并且yuna工具還可以自動把文章進行分類和歸檔。

為了快速上線該功能,當時我們花了三天業餘時間快速開發了yuna1.0版本。在1.0版本中我并沒有使用生産者消費模式,而是使用單線程來處理,因為當時隻需要處理我們一個部門的郵件,是以單線程明顯夠用,整個過程是串行執行的。在一個線程裡,程式先抽取全部的郵件,轉化為文章對象,然後添加全部的文章,最後删除抽取過的郵件。代碼如下:

yuna工具在推廣後,越來越多的部門使用這個工具,處理的時間越來越慢,yuna是每隔5分鐘進行一次抽取的,而當郵件多的時候一次處理可能就花了幾分鐘,于是我在yuna2.0版本裡使用了生産者消費者模式來處理郵件,首先生産者線程按一定的規則去郵件系統裡抽取郵件,然後存放在阻塞隊列裡,消費者從阻塞隊列裡取出文章後插入到conflunce裡。代碼如下:

使用了生産者和消費者模式後,郵件的整體處理速度比以前要快了很多。

在多核時代,多線程并發處理速度比單線程處理速度更快,是以我們可以使用多個線程來生産資料,同樣可以使用多個消費線程來消費資料。而更複雜的情況是,消費者消費的資料,有可能需要繼續處理,于是消費者處理完資料之後,它又要作為生産者把資料放在新的隊列裡,交給其他消費者繼續處理。如下圖:

聊聊并發(十)生産者消費者模式

我們在一個長連接配接伺服器中使用了這種模式,生産者1負責将所有用戶端發送的消息存放在阻塞隊列1裡,消費者1從隊列裡讀消息,然後通過消息id進行hash得到n個隊列中的一個,然後根據編号将消息存放在到不同的隊列裡,每個阻塞隊列會配置設定一個線程來消費阻塞隊列裡的資料。如果消費者2無法消費消息,就将消息再抛回到阻塞隊列1中,交給其他消費者處理。

以下是消息總隊列的代碼;

啟動一個消息分發線程。在這個線程裡子隊列自動去總隊列裡擷取消息。

使用hash算法擷取一個子隊列。

使用的時候我們隻需要往總隊列裡發消息。

java中的線程池類其實就是一種生産者和消費者模式的實作方式,但是我覺得其實作方式更加高明。生産者把任務丢給線程池,線程池建立線程并處理任務,如果将要運作的任務數大于線程池的基本線程數就把任務扔到阻塞隊列裡,這種做法比隻使用一個阻塞隊列來實作生産者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生産者先存,消費者再取這種方式顯然慢一些。

我們的系統也可以使用線程池來實作多生産者消費者模式。比如建立n個不同規模的java線程池來處理不同性質的任務,比如線程池1将資料讀到記憶體之後,交給線程池2裡的線程繼續處理壓縮資料。線程池1主要處理io密集型任務,線程池2主要處理cpu密集型任務。

本章講解了生産者消費者模式,并給出了執行個體。讀者可以在平時的工作中思考下哪些場景可以使用生産者消費者模式,我相信這種場景應該非常之多,特别是需要處理任務時間比較長的場景,比如上傳附件并處理,使用者把檔案上傳到系統後,系統把檔案丢到隊列裡,然後立刻傳回告訴使用者上傳成功,最後消費者再去隊列裡取出檔案處理。比如調用一個遠端接口查詢資料,如果遠端服務接口查詢時需要幾十秒的時間,那麼它可以提供一個申請查詢的接口,這個接口把要申請查詢任務放資料庫中,然後該接口立刻傳回。然後伺服器端用線程輪詢并擷取申請任務進行處理,處理完之後發消息給調用方,讓調用方再來調用另外一個接口拿資料。