天天看點

Java Worker 設計模式Worker模式

異步執行一些任務,有傳回或無傳回結果

有些時候想執行一些異步任務,如異步網絡通信、daemon任務,但又不想去管理這任務的生命周。這個時候可以使用worker模式,它會幫您管理與執行任務,并能非常友善地擷取結果

很多人可能為覺得這與executor很像,但executor是多線程的,它的作用更像是一個規劃中心。而worker則隻是個搬運工,它自己本身隻有一個線程的。每個worker有自己的任務處理邏輯,為了實作這個目的,有兩種方式

1. 建立一個抽象的abstractworker,不同邏輯的worker對其進行不同的實作;

2. 對worker新增一個taskprocessor不同的任務傳入不同的processor即可。

第二種方式worker的角色可以很友善地改變,而且可以随時更換processor,可以了解成可”刷機”的worker

^ ^。這裡我們使用第二種方式來介紹此模式的整體結構。

詳細介紹一下幾個角色:

configurableworker:顧名思義這個就是真正幹活的worker了。要實作自我生命周期管理,需要實作runable,這樣其才能以單獨的線程運作,需要注意的是work最好以daemon線程的方式運作。worker裡面還包括幾個其它成員:taskqueue,一個阻塞性質的queue,一般blockingarraylist就可以了,這樣任務是fifo(先進先出)的,如果要考慮任務的優先級,則可以考慮使用priorityblockingqueue;listeners,根據事件進行劃分的事件監聽者,以便于當一個任務完成的時候進行處理,需要注意的是,為了較高效地進行listener周遊,這裡我推薦使用copyonwritearraylist,免得每次都複制。其對應的方法有addlistener、addtask等配套方法,這個都不多說了,更詳細的可以看後面的示例代碼。

workertask:實際上這是一個抽象的工内容,其包括基本的id與,task的id是worker生成的,相當于遞wtte後的一個執回,當資料執行完了的時候需要使用這個id來取結果。而後面真正實作的實體task則包含任務處理時需要的資料。

processor:為了實作可”刷機”的worker,我們将處理邏輯與worker分開來,processor的本職工作很簡單,隻需要加工傳入的task資料即可,加工完成後觸發fireevent(workerevent.task_complete)事件,之後通過future的get即可得到最終的資料。

另外再說一點,對于addtask,可以有一個overload的方法,即在輸入task的同時,傳入一個rejectpolice,這樣可以在size過大的時候做出拒絕操作,有效避免被撐死。

這種設計能自動處理任務,并能根據任務的優先級自動調節任務的執行順序,一個完全獨立的thread,你完全可以将其了解成一專門負責幹某種活的”機器人”。它可以用于處理一些定時、請求量固定均勻且對實時性要求不是太高的任務,如日志記錄,資料分析等。當然,如果想提高任務處理的資料,可以生成多個worker,就相當于雇傭更多的人來為你幹活,非常直覺的。當然這樣一來,誰來維護這worker便成了一個問題,另外就目前這種設計下worker之間是沒有通信與協同的,這些都是改進點。

那麼對于多個worker,有什麼組織方式呢?這裡我介紹三種,算是抛磚引玉:

就像生産工廠中的房間上的流水線勞工一樣,将任務切分成幾個小塊,每個worker負責自己的一部分,以提高整體的生産、産出效率,如下圖:

Java Worker 設計模式Worker模式

假設完成任務 t 需要的時間為:w(t)=n,那麼将任務分解成m份,流水線式的執行,每小份需要的時間便為 w(t/m)=n/m,那麼執行1000條任務的時間,單個為1000n,流水線長度為l,則用這種方式所用的時間為(1000-1)*(m-l+1)*n/m+n

其中l<m,由此可見,流水線的worker越多、任務越細分,工作的效率将越高。這種主方式的問題在于,如果一個worker出現問題,那麼整個流水線就将停止工作。而且任務的優先級不能動态調用,必須事先告知。

這是一個有q1、q2...qn個多重流水線方式,從高到低分别代碼不同的優先級,高優先級的worker要多于低優先級的,一般是2的倍數,即q4有16個worker、q3有8個,後面類推。任務根據預先估計好的優先級進入,如果任務在某步的執行過長,直接踢到下一級,讓出最快的資源。

顯然這種方式的好處就在于可以動态地調整任務的優級,及時做出反應。當然,為了實作更好的高度,我們可以在低級裡增加一個閥值,使得放偶然放入低級的task可以有複活的機會^

^。

流水線雖然有一定的并行性,但總體來說仍然是串行的,因為隻要有一個節點出了問題,那都是緻命的錯誤。mapreduce是google率先實作的一個分布式算法,有非常好的并行執行效率。

隻要我們将map與reduce都改成worker就行了,如mapworker與reduceworker。這樣,可以看見,map的過程是完全并行的,當然這樣就需要在map與reduce上的配置設定與資料組合上稍稍下一點功夫了。

這裡我們實作一個pageurlminingworker,對給定的url,打開頁面後,采取所有的url,并反回結果進行彙總輸出。由于時間有限,這裡我隻實作了單worker與mapreduce worker集兩種方式,有興趣的同學可以實作其它類型,如多級回報隊列。注意!我這裡隻是向大家展示這種設計模式,url

抓取的效率不在本次考慮之列。

y軸為抓取x軸url個數所用的時間

我們可以看到,worker模式組合是非常靈活的,它真的就像一個活生生的勞工,任你調配。使用worker,我們可以更友善地實作更複雜的結構。