天天看點

聊聊并發(八)——Fork/Join架構介紹

fork/join架構是java7提供了的一個用于并行執行任務的架構, 是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。

我們再通過fork和join這兩個單詞來了解下fork/join架構,fork就是把一個大任務切分為若幹子任務并行的執行,join就是合并這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分别對1000個數進行求和,最終彙總這10個子任務的結果。fork/join的運作流程圖如下:

聊聊并發(八)——Fork/Join架構介紹

工作竊取(work-stealing)算法是指某個線程從其他隊列裡竊取任務來執行。工作竊取的運作流程圖如下:

聊聊并發(八)——Fork/Join架構介紹

那麼為什麼需要使用工作竊取算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競争,于是把這些子任務分别放到不同的隊列裡,并為每個隊列建立一個單獨的線程來執行隊列裡的任務,線程和隊列一一對應,比如a線程負責處理a隊列裡的任務。但是有的線程會先把自己隊列裡的任務幹完,而其他線程對應的隊列裡還有任務等待處理。幹完活的線程與其等着,不如去幫其他線程幹活,于是它就去其他線程的隊列裡竊取一個任務來執行。而在這時它們會通路同一個隊列,是以為了減少竊取任務線程和被竊取任務線程之間的競争,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競争,其缺點是在某些情況下還是存在競争,比如雙端隊列裡隻有一個任務時。并且消耗了更多的系統資源,比如建立多個線程和多個雙端隊列。

我們已經很清楚fork/join架構的需求了,那麼我們可以思考一下,如果讓我們來設計一個fork/join架構,該如何設計?這個思考有助于你了解fork/join架構的設計。

第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,是以還需要不停的分割,直到分割出的子任務足夠小。

第二步執行任務并合并結果。分割的子任務分别放在雙端隊列裡,然後幾個啟動線程分别從雙端隊列裡擷取任務執行。子任務執行完的結果都統一放在一個隊列裡,啟動一個線程從隊列裡拿資料,然後合并這些資料。

fork/join使用兩個類來完成以上兩件事情:

forkjointask:我們要使用forkjoin架構,必須首先建立一個forkjoin任務。它提供在任務中執行fork()和join()操作的機制,通常情況下我們不需要直接繼承forkjointask類,而隻需要繼承它的子類,fork/join架構提供了以下兩個子類:

recursiveaction:用于沒有傳回結果的任務。

recursivetask :用于有傳回結果的任務。

forkjoinpool :forkjointask需要通過forkjoinpool來執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。

讓我們通過一個簡單的需求來使用下fork/join架構,需求是:計算1+2+3+4的結果。

使用fork/join架構首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那麼我們設定分割的門檻值是2,由于是4個數字相加,是以fork/join架構會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務的結果。

因為是有結果的任務,是以必須繼承recursivetask,實作代碼如下:

<code>001</code>

<code>packagefj;</code>

<code>002</code>

<code>003</code>

<code>importjava.util.concurrent.executionexception;</code>

<code>004</code>

<code>005</code>

<code>importjava.util.concurrent.forkjoinpool;</code>

<code>006</code>

<code>007</code>

<code>importjava.util.concurrent.future;</code>

<code>008</code>

<code>009</code>

<code>importjava.util.concurrent.recursivetask;</code>

<code>010</code>

<code>011</code>

<code>publicclasscounttaskextendsrecursivetask {</code>

<code>012</code>

<code>013</code>

<code>       </code><code>privatestaticfinalintthreshold= </code><code>2</code><code>;</code><code>//門檻值</code>

<code>014</code>

<code>015</code>

<code>       </code><code>privateintstart;</code>

<code>016</code>

<code>017</code>

<code>       </code><code>privateintend;</code>

<code>018</code>

<code>019</code>

<code>       </code><code>publiccounttask(intstart,intend) {</code>

<code>020</code>

<code>021</code>

<code>                   </code><code>this</code><code>.start= start;</code>

<code>022</code>

<code>023</code>

<code>                   </code><code>this</code><code>.end= end;</code>

<code>024</code>

<code>025</code>

<code>        </code><code>}</code>

<code>026</code>

<code>027</code>

<code>       </code><code>@override</code>

<code>028</code>

<code>029</code>

<code>       </code><code>protectedinteger compute() {</code>

<code>030</code>

<code>031</code>

<code>                   </code><code>intsum = </code><code>0</code><code>;</code>

<code>032</code>

<code>033</code>

<code>                   </code><code>//如果任務足夠小就計算任務</code>

<code>034</code>

<code>035</code>

<code>                   </code><code>booleancancompute = (end-start) &lt;=threshold;</code>

<code>036</code>

<code>037</code>

<code>                   </code><code>if</code><code>(cancompute) {</code>

<code>038</code>

<code>039</code>

<code>                              </code><code>for</code><code>(inti =start; i &lt;=end; i++) {</code>

<code>040</code>

<code>041</code>

<code>                                           </code><code>sum += i;</code>

<code>042</code>

<code>043</code>

<code>                               </code><code>}</code>

<code>044</code>

<code>045</code>

<code>                    </code><code>}</code><code>else</code><code>{</code>

<code>046</code>

<code>047</code>

<code>                              </code><code>//如果任務大于閥值,就分裂成兩個子任務計算</code>

<code>048</code>

<code>049</code>

<code>                              </code><code>intmiddle = (start+end) / </code><code>2</code><code>;</code>

<code>050</code>

<code>051</code>

<code>                               </code><code>counttask lefttask =newcounttask(start, middle);</code>

<code>052</code>

<code>053</code>

<code>                               </code><code>counttask righttask =newcounttask(middle + </code><code>1</code><code>,end);</code>

<code>054</code>

<code>055</code>

<code>                              </code><code>//執行子任務</code>

<code>056</code>

<code>057</code>

<code>                               </code><code>lefttask.fork();</code>

<code>058</code>

<code>059</code>

<code>                               </code><code>righttask.fork();</code>

<code>060</code>

<code>061</code>

<code>                              </code><code>//等待子任務執行完,并得到其結果</code>

<code>062</code>

<code>063</code>

<code>                              </code><code>intleftresult=lefttask.join();</code>

<code>064</code>

<code>065</code>

<code>                              </code><code>intrightresult=righttask.join();</code>

<code>066</code>

<code>067</code>

<code>                              </code><code>//合并子任務</code>

<code>068</code>

<code>069</code>

<code>                               </code><code>sum = leftresult  + rightresult;</code>

<code>070</code>

<code>071</code>

<code>                    </code><code>}</code>

<code>072</code>

<code>073</code>

<code>                   </code><code>returnsum;</code>

<code>074</code>

<code>075</code>

<code>076</code>

<code>077</code>

<code>       </code><code>publicstaticvoidmain(string[] args) {</code>

<code>078</code>

<code>079</code>

<code>                    </code><code>forkjoinpool forkjoinpool =newforkjoinpool();</code>

<code>080</code>

<code>081</code>

<code>                   </code><code>//生成一個計算任務,負責計算1+2+3+4</code>

<code>082</code>

<code>083</code>

<code>                    </code><code>counttask task =newcounttask(</code><code>1</code><code>, </code><code>4</code><code>);</code>

<code>084</code>

<code>085</code>

<code>                   </code><code>//執行一個任務</code>

<code>086</code>

<code>087</code>

<code>                    </code><code>future result = forkjoinpool.submit(task);</code>

<code>088</code>

<code>089</code>

<code>                   </code><code>try</code><code>{</code>

<code>090</code>

<code>091</code>

<code>                               </code><code>system.out.println(result.get());</code>

<code>092</code>

<code>093</code>

<code>                    </code><code>}</code><code>catch</code><code>(interruptedexception e) {</code>

<code>094</code>

<code>095</code>

<code>                    </code><code>}</code><code>catch</code><code>(executionexception e) {</code>

<code>096</code>

<code>097</code>

<code>098</code>

<code>099</code>

<code>100</code>

<code>101</code>

<code>}</code>

通過這個例子讓我們再來進一步了解forkjointask,forkjointask與一般的任務的主要差別在于它需要實作compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看目前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行目前子任務并傳回結果。使用join方法會等待子任務執行完并得到其結果。

forkjointask在執行的時候可能會抛出異常,但是我們沒辦法在主線程裡直接捕獲異常,是以forkjointask提供了iscompletedabnormally()方法來檢查任務是否已經抛出異常或已經被取消了,并且可以通過forkjointask的getexception方法擷取異常。使用如下代碼:

getexception方法傳回throwable對象,如果任務被取消了則傳回cancellationexception。如果任務沒有完成或者沒有抛出異常則傳回null。

forkjoinpool由forkjointask數組和forkjoinworkerthread數組組成,forkjointask數組負責存放程式送出給forkjoinpool的任務,而forkjoinworkerthread數組負責執行這些任務。

forkjointask的fork方法實作原理。當我們調用forkjointask的fork方法時,程式會調用forkjoinworkerthread的pushtask方法異步的執行這個任務,然後立即傳回結果。代碼如下:

<code>1</code>

<code>public</code> <code>final</code> <code>forkjointask fork() {</code>

<code>2</code>

<code>        </code><code>((forkjoinworkerthread) thread.currentthread())</code>

<code>3</code>

<code>            </code><code>.pushtask(</code><code>this</code><code>);</code>

<code>4</code>

<code>        </code><code>return</code> <code>this</code><code>;</code>

<code>5</code>

pushtask方法把目前任務存放在forkjointask 數組queue裡。然後再調用forkjoinpool的signalwork()方法喚醒或建立一個工作線程來執行任務。代碼如下:

<code>01</code>

<code>final</code> <code>void</code> <code>pushtask(forkjointask t) {</code>

<code>02</code>

<code>        </code><code>forkjointask[] q; </code><code>int</code> <code>s, m;</code>

<code>03</code>

<code>        </code><code>if</code> <code>((q = queue) != </code><code>null</code><code>) {    </code><code>// ignore if queue removed</code>

<code>04</code>

<code>            </code><code>long</code> <code>u = (((s = queuetop) &amp; (m = q.length - </code><code>1</code><code>)) &lt;&lt; ashift) + abase;</code>

<code>05</code>

<code>            </code><code>unsafe.putorderedobject(q, u, t);</code>

<code>06</code>

<code>            </code><code>queuetop = s + </code><code>1</code><code>;         </code><code>// or use putorderedint</code>

<code>07</code>

<code>            </code><code>if</code> <code>((s -= queuebase) &lt;= </code><code>2</code><code>)</code>

<code>08</code>

<code>                </code><code>pool.signalwork();</code>

<code>09</code>

<code>    </code><code>else</code> <code>if</code> <code>(s == m)</code>

<code>10</code>

<code>                </code><code>growqueue();</code>

<code>11</code>

<code>12</code>

<code>    </code><code>}</code>

forkjointask的join方法實作原理。join方法的主要作用是阻塞目前線程并等待擷取結果。讓我們一起看看forkjointask的join方法的實作,代碼如下:

<code>public</code> <code>final</code> <code>v join() {</code>

<code>        </code><code>if</code> <code>(dojoin() != normal)</code>

<code>            </code><code>return</code> <code>reportresult();</code>

<code>        </code><code>else</code>

<code>            </code><code>return</code> <code>getrawresult();</code>

<code>private</code> <code>v reportresult() {</code>

<code>        </code><code>int</code> <code>s; throwable ex;</code>

<code>        </code><code>if</code> <code>((s = status) == cancelled)</code>

<code>            </code><code>throw</code> <code>new</code> <code>cancellationexception();</code>

<code>if</code> <code>(s == exceptional &amp;&amp; (ex = getthrowableexception()) != </code><code>null</code><code>)</code>

<code>            </code><code>unsafe.throwexception(ex);</code>

<code>13</code>

<code>        </code><code>return</code> <code>getrawresult();</code>

<code>14</code>

首先,它調用了dojoin()方法,通過dojoin()方法得到目前任務的狀态來判斷傳回什麼結果,任務狀态有四種:已完成(normal),被取消(cancelled),信号(signal)和出現異常(exceptional)。

如果任務狀态是已完成,則直接傳回任務結果。

如果任務狀态是被取消,則直接抛出cancellationexception。

如果任務狀态是抛出異常,則直接抛出對應的異常。

讓我們再來分析下dojoin()方法的實作代碼:

<code>private</code> <code>int</code> <code>dojoin() {</code>

<code>        </code><code>thread t; forkjoinworkerthread w; </code><code>int</code> <code>s; </code><code>boolean</code> <code>completed;</code>

<code>        </code><code>if</code> <code>((t = thread.currentthread()) </code><code>instanceof</code> <code>forkjoinworkerthread) {</code>

<code>            </code><code>if</code> <code>((s = status) &lt; </code><code>0</code><code>)</code>

<code> </code><code>return</code> <code>s;</code>

<code>            </code><code>if</code> <code>((w = (forkjoinworkerthread)t).unpushtask(</code><code>this</code><code>)) {</code>

<code>                </code><code>try</code> <code>{</code>

<code>                    </code><code>completed = exec();</code>

<code>                </code><code>} </code><code>catch</code> <code>(throwable rex) {</code>

<code>                    </code><code>return</code> <code>setexceptionalcompletion(rex);</code>

<code>                </code><code>}</code>

<code>                </code><code>if</code> <code>(completed)</code>

<code>                    </code><code>return</code> <code>setcompletion(normal);</code>

<code>            </code><code>}</code>

<code>15</code>

<code>            </code><code>return</code> <code>w.jointask(</code><code>this</code><code>);</code>

<code>16</code>

<code>17</code>

<code>18</code>

<code>            </code><code>return</code> <code>externalawaitdone();</code>

<code>19</code>

在dojoin()方法裡,首先通過檢視任務的狀态,看任務是否已經執行完了,如果執行完了,則直接傳回任務狀态,如果沒有執行完,則從任務數組裡取出任務并執行。如果任務順利執行完成了,則設定任務狀态為normal,如果出現異常,則紀錄異常,并将任務狀态設定為exceptional。

jdk1.7源碼

http://ifeve.com/fork-join-5/

繼續閱讀