天天看點

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構

本章内容包含:

建立fork/join線程池

合并任務的結果

異步運作任務

在任務中抛出異常

取消任務

通常,使用java來開發一個簡單的并發應用程式時,會建立一些runnable對象,然後建立對應的thread 對象來控制程式中這些線程的建立、執行以及線程的狀态。自從java 5開始引入了executor和executorservice接口以及實作這兩個接口的類(比如threadpoolexecutor)之後,使得java在并發支援上得到了進一步的提升。

執行器架構(executor framework)将任務的建立和執行進行了分離,通過這個架構,隻需要實作runnable接口的對象和使用executor對象,然後将runnable對象發送給執行器。執行器再負責運作這些任務所需要的線程,包括線程的建立,線程的管理以及線程的結束。

java 7則又更進了一步,它包括了executorservice接口的另一種實作,用來解決特殊類型的問題,它就是fork/join架構,有時也稱分解/合并架構。

fork/join架構是用來解決能夠通過分治技術(divide and conquer technique)将問題拆分成小任務的問題。在一個任務中,先檢查将要解決的問題的大小,如果大于一個設定的大小,那就将問題拆分成可以通過架構來執行的小任務。如果問題的大小比設定的大小要小,就可以直接在任務裡解決這個問題,然後,根據需要傳回任務的結果。下面的圖形總結了這個原理。

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構

沒有固定的公式來決定問題的參考大小(reference size),進而決定一個任務是需要進行拆分或不需要拆分,拆分與否仍是依賴于任務本身的特性。可以使用在任務中将要處理的元素的數目和任務執行所需要的時間來決定參考大小。測試不同的參考大小來決定解決問題最好的一個方案,将forkjoinpool類看作一個特殊的 executor 執行器類型。這個架構基于以下兩種操作。

分解(fork)操作:當需要将一個任務拆分成更小的多個任務時,在架構中執行這些任務;

合并(join)操作:當一個主任務等待其建立的多個子任務的完成執行。

fork/join架構和執行器架構(executor framework)主要的差別在于工作竊取算法(work-stealing algorithm)。與執行器架構不同,使用join操作讓一個主任務等待它所建立的子任務的完成,執行這個任務的線程稱之為工作者線程(worker thread)。工作者線程尋找其他仍未被執行的任務,然後開始執行。通過這種方式,這些線程在運作時擁有所有的優點,進而提升應用程式的性能。

為了達到這個目标,通過fork/join架構執行的任務有以下限制。

任務隻能使用fork()和join() 操作當作同步機制。如果使用其他的同步機制,工作者線程就不能執行其他任務,當然這些任務是在同步操作裡時。比如,如果在fork/join 架構中将一個任務休眠,正在執行這個任務的工作者線程在休眠期内不能執行另一個任務。

任務不能執行i/o操作,比如檔案資料的讀取與寫入。

任務不能抛出非運作時異常(checked exception),必須在代碼中處理掉這些異常。

fork/join<b>架構</b>的核心是由下列兩個類組成的。

forkjoinpool:這個類實作了executorservice接口和工作竊取算法(work-stealing algorithm)。它管理工作者線程,并提供任務的狀态資訊,以及任務的執行資訊。

forkjointask:這個類是一個将在forkjoinpool中執行的任務的基類。

fork/join架構提供了在一個任務裡執行fork()和join()操作的機制和控制任務狀态的方法。通常,為了實作fork/join任務,需要實作一個以下兩個類之一的子類。

recursiveaction:用于任務沒有傳回結果的場景。

recursivetask:用于任務有傳回結果的場景。

本章接下來将展示如何利用fork/join架構高效地工作。

在本節,我們将學習如何使用fork/join架構的基本元素。它包括:

 建立用來執行任務的forkjoinpool對象;

建立即将線上程池中被執行的任務forkjointask子類。

本範例中即将使用的fork/join架構的主要特性如下:

采用預設的構造器建立forkjoinpool對象;

在任務中将使用javaapi文檔推薦的結構。

<code>01</code>

<code>if</code> <code>(problem size &gt;</code><code>default</code> <code>size){</code>

<code>02</code>

<code>03</code>

<code>tasks=divide(task);</code>

<code>04</code>

<code>05</code>

<code>execute(tasks);</code>

<code>06</code>

<code>07</code>

<code>}</code><code>else</code> <code>{</code>

<code>08</code>

<code>09</code>

<code>resolve problem using another algorithm;</code>

<code>10</code>

<code>11</code>

<code>}</code>

我們将以同步的方式執行任務。當一個主任務執行兩個或更多的子任務時,這個主任務将等待子任務的完成。用這種方法,執行主任務的線程,稱之為工作者線程(worker thread),它将尋找其他的子任務來執行,并在子任務執行的時間裡利用所有的線程優勢。

如果将要實作的任務沒有傳回任何結果,那麼,采用recursiveaction類作為實作任務的基類。

準備工作

本節的範例是在eclipseide裡完成的。無論你使用eclipse還是其他的ide(比如netbeans),都可以打開這個ide并且建立一個新的java工程。

範例實作

在本節,我們将實作一項更新産品價格的任務。最初的任務将負責更新清單中的所有元素。我們使用10來作為參考大小(referencesize),如果一個任務需要更新大于10個元素,它會将這個清單分解成為兩部分,然後分别建立兩個任務用來更新各自部分的産品價格。

按照接下來的步驟實作本節的範例。

1.建立一個名為product的類,用來存儲産品的名稱和價格。

<code>1</code>

<code>public</code> <code>class</code> <code>product {</code>

2.聲明一個名為name的私有string屬性,一個名為price的私有double屬性。

<code>private</code> <code>string name;</code>

<code>2</code>

<code>3</code>

<code>private</code> <code>double</code> <code>price;</code>

3.實作兩個屬性各自的設值與取值方法。

<code>public</code> <code>string getname() {</code>

<code>return</code> <code>name;</code>

<code>public</code> <code>void</code> <code>setname(string name) {</code>

<code>this</code><code>.name = name;</code>

<code>12</code>

<code>13</code>

<code>public</code> <code>double</code> <code>getprice() {</code>

<code>14</code>

<code>15</code>

<code>return</code> <code>price;</code>

<code>16</code>

<code>17</code>

<code>18</code>

<code>19</code>

<code>public</code> <code>void</code> <code>setprice(</code><code>double</code> <code>price) {</code>

<code>20</code>

<code>21</code>

<code>this</code><code>.price = price;</code>

<code>22</code>

<code>23</code>

4.建立一個名為productlistgenerator的類,用來生成一個随機産品清單。

<code>public</code> <code>class</code> <code>productlistgenerator {</code>

5.實作generate()方法。接收一個表示清單大小的int參數,并傳回一個生成産品的list&lt;product&gt;清單。

<code>public</code> <code>list&lt;product&gt; generate (</code><code>int</code> <code>size) {</code>

6.建立傳回産品清單的對象ret。

<code>list&lt;product&gt; ret=</code><code>new</code> <code>arraylist&lt;product&gt;();</code>

7.生成産品清單,給所有的産品配置設定相同的價格,比如可以檢查程式是否運作良好的數字10。

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;size; i++){</code>

<code>product product=</code><code>new</code> <code>product();</code>

<code>product.setname(</code><code>"product "</code><code>+i);</code>

<code>product.setprice(</code><code>10</code><code>);</code>

<code>ret.add(product);</code>

<code>return</code> <code>ret;</code>

8.建立一個名為task的類,并繼承recursiveaction類。

<code>public</code> <code>class</code> <code>task</code><code>extends</code> <code>recursiveaction {</code>

9.聲明這個類的serialversionuid屬性。這個元素是必需的,因為recursiveaction的父類forkjointask實作了serializable接口。

<code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>serialversionuid = 1l;</code>

10.聲明一個名為products私有的list&lt;product&gt;屬性。

<code>private</code> <code>list&lt;product&gt; products;</code>

11.聲明兩個私有的int屬性,分别命名為first和last。這兩個屬性将決定任務執行時對産品的分塊。

<code>private</code> <code>int</code> <code>first;</code>

<code>private</code> <code>int</code> <code>last;</code>

12.聲明一個名為increment的私有double屬性,用來存儲産品價格的增加額。

<code>private</code> <code>double</code> <code>increment;</code>

13.實作類的構造器,用來初始化類的這些屬性。

<code>public</code> <code>task (list&lt;product&gt; products,</code><code>int</code> <code>first,</code><code>int</code> <code>last,</code><code>double</code>

<code>increment) {</code>

<code>this</code><code>.products=products;</code>

<code>this</code><code>.first=first;</code>

<code>this</code><code>.last=last;</code>

<code>this</code><code>.increment=increment;</code>

14.實作compute()方法,實作任務的執行邏輯。

<code>@override</code>

<code>protected</code> <code>void</code> <code>compute() {</code>

15.如果last和first屬性值的差異小于10(一個任務隻能更新少于10件産品的價格),則調用updateprices()方法增加這些産品的價格。

<code>if</code> <code>(last-first&lt;</code><code>10</code><code>) {</code>

<code>updateprices();</code>

16.如果last和first屬性值的差異大于或等于10,就建立兩個新的task對象,一個處理前一半的産品,另一個處理後一半的産品,然後調用forkjoinpool的invokeall()方法來執行這兩個新的任務。

<code>int</code> <code>middle=(last+first)/</code><code>2</code><code>;</code>

<code>system.out.printf("task: pending tasks:</code>

<code>%s\n",getqueuedtaskcount());</code>

<code>task t1=</code><code>new</code> <code>task(products, first,middle+</code><code>1</code><code>, increment);</code>

<code>task t2=</code><code>new</code> <code>task(products, middle+</code><code>1</code><code>,last, increment);</code>

<code>invokeall(t1, t2);</code>

17.實作updateprices()方法。這個方法用來更新在産品清單中處于first和last屬性之間的産品。

<code>private</code> <code>void</code> <code>updateprices() {</code>

<code>for</code> <code>(</code><code>int</code> <code>i=first; i&lt;last; i++){</code>

<code>product product=products.get(i);</code>

<code>product.setprice(product.getprice()*(</code><code>1</code><code>+increment));</code>

18.實作範例的主類,建立main主類,并實作main()方法。

<code>public</code> <code>class</code> <code>main {</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

19.使用productlistgenerator類建立一個有10,000個産品的清單

<code>productlistgenerator generator=</code><code>new</code> <code>productlistgenerator();</code>

<code>list&lt;product&gt; products=generator.generate(</code><code>10000</code><code>);</code>

20.建立一個新的task對象用來更新清單中的所有産品。參數first為0,參數last為産品清單的大小,即10,000。

<code>task task=</code><code>new</code> <code>task(products,</code><code>0</code><code>, products.size(),</code><code>0.20</code><code>);</code>

21.通過無參的類構造器建立一個forkjoinpool對象。

<code>forkjoinpool pool=</code><code>new</code> <code>forkjoinpool();</code>

22.調用execute()方法執行任務。

<code>pool.execute(task);</code>

23.實作代碼塊,顯示關于線程池演變的資訊,每5毫秒在控制台上輸出線程池的一些參數值,直到任務執行結束。

<code>do</code> <code>{</code>

<code>system.out.printf(</code><code>"main: thread count: %d\n"</code><code>,pool.</code>

<code>getactivethreadcount());</code>

<code>system.out.printf(</code><code>"main: thread steal: %d\n"</code><code>,pool.</code>

<code>getstealcount());</code>

<code>system.out.printf(</code><code>"main: parallelism: %d\n"</code><code>,pool.</code>

<code>getparallelism());</code>

<code>try</code> <code>{</code>

<code>timeunit.milliseconds.sleep(</code><code>5</code><code>);</code>

<code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>e.printstacktrace();</code>

<code>24</code>

<code>25</code>

<code>}</code><code>while</code> <code>(!task.isdone());</code>

24.調用shutdown()方法關閉線程池。

<code>pool.shutdown();</code>

25.調用iscompletednormally()方法,檢查任務是否已經完成并且沒有錯誤,在這個示例中,在控制台輸出資訊表示任務已經處理結束。

<code>if</code> <code>(task.iscompletednormally()){</code>

<code>system.out.printf("main: the process has completed</code>

<code>4</code>

<code>5</code>

<code>normally.\n");</code>

<code>6</code>

<code>7</code>

26.在增加之後,所有産品的期望價格是12元。在控制台輸出所有産品的名稱和價格,如果産品的價格不是12元,就将産品資訊列印出來,以便确認所有的産品價格都正确地增加了。

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;products.size(); i++){</code>

<code>if</code> <code>(product.getprice()!=</code><code>12</code><code>) {</code>

<code>system.out.printf(</code><code>"product %s: %f\n"</code><code>,product.</code>

<code>getname(),product.getprice());</code>

27.在控制台輸出資訊表示程式執行結束。

<code>system.out.println(</code><code>"main: end of the program.\n"</code><code>);</code>

<code>&lt;b&gt; &lt;/b&gt;</code>

工作原理

在這個範例中,我們建立了forkjoinpool對象,和一個将線上程池中執行的forkjointask的子類。使用了無參的類構造器建立了forkjoinpool對象,是以它将執行預設的配置。建立一個線程數等于計算機cpu數目的線程池,建立好forkjoinpool對象之後,那些線程也建立就緒了,線上程池中等待任務的到達,然後開始執行。

由于task類繼承了recursiveaction類,是以不傳回結果。在本節,我們使用了推薦的結構來實作任務。如果任務需要更新大于10個産品,它将拆分這些元素為兩部分,建立兩個任務,并将拆分的部分相應地配置設定給新建立的任務。通過使用task類的first和last屬性,來獲知任務将要更新的産品清單所在的位置範圍。我們已經使用first和last屬性,來操作産品清單中僅有的一份副本,而沒有為每一個任務去建立不同的産品清單。

調用invokeall()方法來執行一個主任務所建立的多個子任務。這是一個同步調用,這個任務将等待子任務完成,然後繼續執行(也可能是結束)。當一個主任務等待它的子任務時,執行這個主任務的工作者線程接收另一個等待執行的任務并開始執行。正因為有了這個行為,是以說fork/join架構提供了一種比runnable和callable對象更加高效的任務管理機制。

forkjointask類的invokeall()方法是執行器架構(executorframework)和fork/join架構之間的主要差異之一。在執行器架構中,所有的任務必須發送給執行器,然而,在這個示例中,線程池中包含了待執行方法的任務,任務的控制也是線上程池中進行的。我們在task類中使用了invokeall()方法,task類繼承了recursiveaction類,而recursiveaction類則繼承了forkjointask類。

我們已經發送一個唯一的任務到線程池中,通過使用execute()方法來更新所有産品的清單。在這個示例中,它是一個同步調用,主線程一直等待調用的執行。

我們已經使用了forkjoinpool類的一些方法,來檢查正在運作的任務的狀态和演變情況。這個類包含更多的方法,可以用于任務狀态的檢測。參見8.5節介紹的這些方法的完整清單。

最後,像執行器架構一樣,必須調用shutdown()方法來結束forkjoinpool的執行。

下面的截圖展示了這個範例執行的部分結果。

可以看到,任務執行結束,并且産品的價格已經更新了。

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構

更多資訊

forkjoinpool類還提供了以下方法用于執行任務。

execute (runnabletask):這是本範例中使用的execute()方法的另一種版本。這個方法發送一個runnable任務給forkjoinpool類。需要注意的是,使用runnable對象時forkjoinpool類就不采用工作竊取算法(work-stealingalgorithm),forkjoinpool類僅在使用forkjointask類時才采用工作竊取算法。

invoke(forkjointask&lt;t&gt;task):正如範例所示,forkjoinpool類的execute()方法是異步調用的,而forkjoinpool類的invoke()方法則是同步調用的。這個方法直到傳遞進來的任務執行結束後才會傳回。

也可以使用在executorservice類中聲明的invokeall()和invokeany()方法,這些方法接收callable對象作為參數。使用callable對象時forkjoinpool類就不采用工作竊取算法(work-stealingalgorithm),是以,最好使用執行器來執行callable對象。

forkjointask類也包含了在範例中所使用的invokeall()方法的其他版本,這些版本如下。

invokeall(forkjointask&lt;?&gt;… tasks):這個版本的方法接收一個可變的參數清單,可以傳遞盡可能多的forkjointask對象給這個方法作為參數。

invokeall(collection&lt;t&gt;tasks):這個版本的方法接受一個泛型類型t的對象集合(比如,arraylist對象、linkedlist對象或者treeset對象)。這個泛型類型t必須是forkjointask類或者它的子類。

雖然forkjoinpool類是設計用來執行forkjointask對象的,但也可以直接用來執行runnable和callable對象。當然,也可以使用forkjointask類的adapt()方法來接收一個callable對象或者一個runnable對象,然後将之轉化為一個forkjointask對象,然後再去執行。

參見

參見8.5節。

fork/join架構提供了執行任務并傳回結果的能力。這些類型的任務都是通過recursivetask類來實作的。recursivetask類繼承了forkjointask類,并且實作了由執行器架構(executor framework)提供的future接口。

在任務中,必須使用java api文檔推薦的如下結構:

<code>if</code> <code>(problem size &gt; size){</code>

<code>groupresults()</code>

<code>return</code> <code>result;</code>

<code>resolve problem;</code>

如果任務需要解決的問題大于預先定義的大小,那麼就要将這個問題拆分成多個子任務,并使用fork/join架構來執行這些子任務。執行完成後,原始任務擷取到由所有這些子任務産生的結果,合并這些結果,傳回最終的結果。當原始任務線上程池中執行結束後,将高效地擷取到整個問題的最終結果。

在本節,我們将學習如何使用fork/join架構來解決這種問題,開發一個應用程式,在文檔中查找一個詞。我們将實作以下兩種任務:

一個文檔任務,它将周遊文檔中的每一行來查找這個詞;

一個行任務,它将在文檔的一部分當中查找這個詞。

所有這些任務将傳回文檔或行中所出現這個詞的次數。

1.建立一個名為documentmock的類。它将生成一個字元串矩陣來模拟一個文檔。

<code>public</code> <code>class</code> <code>document {</code>

2.用一些詞來建立一個字元串數組。這個數組将被用來生成字元串矩陣。

<code>private</code> <code>string words[]={</code><code>"the"</code><code>,</code><code>"hello"</code><code>,</code><code>"goodbye"</code><code>,</code><code>"packt"</code><code>,</code><code>"java"</code><code>,"t</code>

<code>hread</code><code>","</code><code>pool</code><code>","</code><code>random</code><code>","</code><code>class</code><code>","</code><code>main"};</code>

3.實作generatedocument()方法。它接收3個參數,分别是行數numlines,每一行詞的個數numwords,和準備查找的詞word。然後傳回一個字元串矩陣。

<code>public</code> <code>string[][] generatedocument(</code><code>int</code> <code>numlines,</code><code>int</code> <code>numwords,</code>

<code>string word){</code>

4.建立用來生成文檔所需要的對象:string矩陣,和用來生成随機數的random對象。

<code>int</code> <code>counter=</code><code>0</code><code>;</code>

<code>string document[][]=</code><code>new</code> <code>string[numlines][numwords];</code>

<code>random random=</code><code>new</code> <code>random();</code>

5.為字元串矩陣填上字元串。通過随機數取得數組words中的某一字元串,然後存入到字元串矩陣document對應的位置上,同時計算生成的字元串矩陣中将要查找的詞出現的次數。這個值可以用來與後續程式運作查找任務時統計的次數相比較,檢查兩個值是否相同。

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;numlines; i++){</code>

<code>for</code> <code>(</code><code>int</code> <code>j=</code><code>0</code><code>; j&lt;numwords; j++) {</code>

<code>int</code> <code>index=random.nextint(words.length);</code>

<code>document[i][j]=words[index];</code>

<code>if</code> <code>(document[i][j].equals(word)){</code>

<code>counter++;</code>

6.在控制台輸出這個詞出現的次數,并傳回生成的矩陣document。

<code>system.out.println(</code><code>"documentmock: the word appears "</code><code>+</code>

<code>counter+</code><code>" times in the document"</code><code>);</code>

<code>return</code> <code>document;</code>

7.建立名為documenttask的類,并繼承recursivetask類,recursivetask類的泛型參數為integer類型。這個documenttask類将實作一個任務,用來計算所要查找的詞在行中出現的次數。

<code>public</code> <code>class</code> <code>documenttask</code><code>extends</code> <code>recursivetask&lt;integer&gt; {</code>

8.聲明一個名為document的私有string矩陣,以及兩個名為start和end的私有int屬性,并聲明一個名為word的私有string屬性。

<code>private</code> <code>string document[][];</code>

<code>private</code> <code>int</code> <code>start, end;</code>

<code>private</code> <code>string word;</code>

9.實作類的構造器,用來初始化類的所有屬性。

<code>public</code> <code>documenttask (string document[][],</code><code>int</code> <code>start,</code><code>int</code> <code>end,</code>

<code>this</code><code>.document=document;</code>

<code>this</code><code>.start=start;</code>

<code>this</code><code>.end=end;</code>

<code>this</code><code>.word=word;</code>

10.實作compute()方法。如果end和start的差異小于10,則調用processlines()方法,來計算這兩個位置之間要查找的詞出現的次數。

<code>protected</code> <code>integer compute() {</code>

<code>int</code> <code>result;</code>

<code>if</code> <code>(end-start&lt;</code><code>10</code><code>){</code>

<code>8</code>

<code>9</code>

<code>result=processlines(document, start, end, word);</code>

11.否則,拆分這些行成為兩個對象,并建立兩個新的documenttask對象來處理這兩個對象,然後調用invokeall()方法線上程池裡執行它們。

<code>int</code> <code>mid=(start+end)/</code><code>2</code><code>;</code>

<code>documenttask task1=</code><code>new</code> <code>documenttask(document,start,mid,wo</code>

<code>rd);</code>

<code>documenttask task2=</code><code>new</code> <code>documenttask(document,mid,end,word);</code>

<code>invokeall(task1,task2);</code>

12.采用groupresults()方法将這兩個任務傳回的值相加。最後,傳回任務計算的結果。

<code>result=groupresults(task1.get(),task2.get());</code>

<code>}</code><code>catch</code> <code>(interruptedexception | executionexception e) {</code>

13.實作processlines()方法。接收4個參數,一個字元串document矩陣,start屬性,end屬性和任務将要查找的詞word的屬性。

<code>private</code> <code>integer processlines(string[][] document,</code><code>int</code> <code>start,</code><code>int</code>

<code>end,string word) {</code>

14.為任務所要處理的每一行,建立一個linetask對象,然後存儲在任務清單裡。

<code>list&lt;linetask&gt; tasks=</code><code>new</code> <code>arraylist&lt;linetask&gt;();</code>

<code>for</code> <code>(</code><code>int</code> <code>i=start; i&lt;end; i++){</code>

<code>linetask task=</code><code>new</code> <code>linetask(document[i],</code><code>0</code><code>, document[i].</code>

<code>length, word);</code>

<code>tasks.add(task);</code>

15.調用invokeall()方法執行清單中所有的任務。

<code>invokeall(tasks);</code>

16.合計這些任務傳回的值,并傳回結果。

<code>int</code> <code>result=</code><code>0</code><code>;</code>

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;tasks.size(); i++) {</code>

<code>linetask task=tasks.get(i);</code>

<code>result=result+task.get();</code>

<code>return</code> <code>new</code> <code>integer(result);</code>

17.實作groupresults()方法。它将兩個數字相加并傳回結果。

<code>private</code> <code>integer groupresults(integer number1, integer number2) {</code>

<code>integer result;</code>

<code>result=number1+number2;</code>

18.建立名為linetask的類,并繼承recursivetask類,recursivetask類的泛型參數為integer類型。這個recursivetask類實作了一個任務,用來計算所要查找的詞在一行中出現的次數。

<code>public</code> <code>class</code> <code>linetask</code><code>extends</code> <code>recursivetask&lt;integer&gt;{</code>

19.聲明類的serialversionuid屬性。這個元素是必需的,因為recursivetask的父類forkjointask實作了serializable接口。聲明一個名為line的私有string數組屬性和兩個名為start和end的私有int屬性。最後,聲明一個名為word的私有string屬性。

<code>private</code> <code>string line[];</code>

20.實作類的構造器,用來初始化它的屬性。

<code>public</code> <code>linetask(string line[],</code><code>int</code> <code>start,</code><code>int</code> <code>end, string word)</code>

<code>{</code>

<code>this</code><code>.line=line;</code>

21.實作compute()方法。如果end和start屬性的差異小于100,那麼任務将采用count()方法,在由start與end屬性所決定的行的片斷中查找詞。

<code>integer result=</code><code>null</code><code>;</code>

<code>if</code> <code>(end-start&lt;</code><code>100</code><code>) {</code>

<code>result=count(line, start, end, word);</code>

22.如果end和start屬性的差異不小于100,将這一組詞拆分成兩組,然後建立兩個新的linetask對象來處理這兩個組,調用invokeall()方法線上程池中執行它們。

<code>linetask task1=</code><code>new</code> <code>linetask(line, start, mid, word);</code>

<code>linetask task2=</code><code>new</code> <code>linetask(line, mid, end, word);</code>

<code>invokeall(task1, task2);</code>

23.調用groupresults()方法将兩個任務傳回的值相加。最後傳回任務計算的結果。

24.實作count()方法。它接收4個參數,一個完整行字元串line數組,start屬性,end屬性和任務将要查找的詞word的屬性。

<code>private</code> <code>integer count(string[] line,</code><code>int</code> <code>start,</code><code>int</code> <code>end, string</code>

<code>word) {</code>

25.将存儲在start和end屬性值之間的詞與任務正在查找的word屬性相比較。如果相同,那麼将計數器counter變量加1。

<code>int</code> <code>counter;</code>

<code>counter=</code><code>0</code><code>;</code>

<code>if</code> <code>(line[i].equals(word)){</code>

26.為了延緩範例的執行,将任務休眠10毫秒。

<code>thread.sleep(</code><code>10</code><code>);</code>

27.傳回計數器counter變量的值。

<code>return</code> <code>counter;</code>

28.實作groupresults()方法。計算兩個數字之和并傳回結果。.

29.實作範例的主類,建立main主類,并實作main()方法。

<code>public</code> <code>class</code> <code>main{</code>

30.建立document對象,包含100行,每行1,000個詞。

<code>documentmock mock=</code><code>new</code> <code>documentmock();</code>

<code>string[][] document=mock.generatedocument(</code><code>100</code><code>,</code><code>1000</code><code>,</code><code>"the"</code><code>);</code>

31.建立一個documenttask對象,用來更新整個文檔。傳遞數字0給參數start,以及數字100給參數end。

<code>documenttask task=</code><code>new</code> <code>documenttask(document,</code><code>0</code><code>,</code><code>100</code><code>,</code><code>"the"</code><code>);</code>

32.采用無參的構造器建立一個forkjoinpool對象,然後調用execute()方法線上程池裡執行這個任務。

33.實作代碼塊,顯示線程池的進展資訊,每秒鐘在控制台輸出線程池的一些參數,直到任務執行結束。

<code>system.out.printf("*****************************************</code>

<code>*\n");</code>

<code>system.out.printf(</code><code>"main: active threads: %d\n"</code><code>,pool.</code>

<code>system.out.printf(</code><code>"main: task count: %d\n"</code><code>,pool.</code>

<code>getqueuedtaskcount());</code>

<code>system.out.printf(</code><code>"main: steal count: %d\n"</code><code>,pool.</code>

<code>26</code>

<code>27</code>

<code>28</code>

<code>29</code>

<code>timeunit.seconds.sleep(</code><code>1</code><code>);</code>

<code>30</code>

<code>31</code>

<code>32</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>36</code>

<code>37</code>

34.調用shutdown()方法關閉線程池。

35.調用awaittermination()等待任務執行結束。

<code>pool.awaittermination(</code><code>1</code><code>, timeunit.days);</code>

36.在控制台輸出文檔中出現要查找的詞的次數。檢驗這個數字與documentmock類輸出的數字是否一緻。

<code>system.out.printf("main: the word appears %d in the</code>

<code>document",task.get());</code>

在這個範例中,我們實作了兩個不同的任務。

documenttask類:這個類的任務需要處理由start和end屬性決定的文檔行。如果這些行數小于10,那麼,就每行建立一個linetask對象,然後在任務執行結束後,合計傳回的結果,并傳回總數。如果任務要處理的行數大于10,那麼,将任務拆分成兩組,并建立兩個documenttask對象來處理這兩組對象。當這些任務執行結束後,同樣合計傳回的結果,并傳回總數。

linetask類:這個類的任務需要處理文檔中一行的某一組詞。如果一組詞的個數小100,那麼任務将直接在這一組詞裡搜尋特定詞,然後傳回查找詞在這一組詞中出現的次數。否則,任務将拆分這些詞為兩組,并建立兩個linetask對象來處理這兩組詞。當這些任務執行完成後,合計傳回的結果,并傳回總數。

在main主類中,我們通過預設的構造器建立了forkjoinpool對象,然後執行documenttask類,來處理一個共有100行,每行1,000字的文檔。這個任務将問題拆分成documenttask對象和linetask對象,然後當所有的任務執行完成後,使用原始的任務來擷取整個文檔中所要查找的詞出現的次數。由于任務繼承了recursivetask類,是以能夠傳回結果。

調用get()方法來獲得task傳回的結果。這個方法聲明在future接口裡,并由recursivetask類實作。

執行程式時,在控制台上,我們可以比較第一行與最後一行的輸出資訊。第一行是文檔生成時被查找的詞出現的次數,最後一行則是通過fork/join任務計算而來的被查找的詞出現的次數,而且這兩個數字相同。

forkjointask類提供了另一個complete()方法來結束任務的執行并傳回結果。這個方法接收一個對象,對象的類型就是recursivetask類的泛型參數,然後在任務調用join()方法後傳回這個對象作為結果。這一過程采用了推薦的異步任務來傳回任務的結果。

由于recursivetask類實作了future接口,是以還有get()方法調用的其他版本:

get(long timeout, timeunit unit):這個版本中,如果任務的結果未準備好,将等待指定的時間。如果等待時間超出,而結果仍未準備好,那方法就會傳回null值。

timeunit是一個枚舉類,有如下的常量:days、hours、microseconds、milliseconds、minutes、nanoseconds和seconds。

參見5.2節。

在forkjoinpool中執行 forkjointask時,可以采用同步或異步方式。當采用同步方式執行時,發送任務給fork/join線程池的方法直到任務執行完成後才會傳回結果。而采用異步方式執行時,發送任務給執行器的方法将立即傳回結果,但是任務仍能夠繼續執行。

需要明白這兩種方式在執行任務時的一個很大的差別。當采用同步方式,調用這些方法(比如,invokeall()方法)時,任務被挂起,直到任務被發送到fork/join線程池中執行完成。這種方式允許forkjoinpool類采用工作竊取算法(work-stealingalgorithm)來配置設定一個新任務給在執行休眠任務的工作者線程(workerthread)。相反,當采用異步方法(比如,fork()方法)時,任務将繼續執行,是以forkjoinpool類無法使用工作竊取算法來提升應用程式的性能。在這個示例中,隻有調用join()或get()方法來等待任務的結束時,forkjoinpool類才可以使用工作竊取算法。

本節将學習如何使用forkjoinpool和forkjointask類所提供的異步方法來管理任務。我們将實作一個程式:在一個檔案夾及其子檔案夾中來搜尋帶有指定擴充名的檔案。forkjointask類将實作處理這個檔案夾的内容。而對于這個檔案夾中的每一個子檔案,任務将以異步的方式發送一個新的任務給forkjoinpool類。對于每個檔案夾中的檔案,任務将檢查任務檔案的擴充名,如果符合條件就将其增加到結果清單中。

1.建立名為folderprocessor的類,并繼承recursivetask類,recursivetask類的泛型參數為list&lt;string&gt;類型。

<code>public</code> <code>class</code> <code>folderprocessor</code><code>extends</code> <code>recursivetask&lt;list&lt;string&gt;&gt; {</code>

2.聲明類的serialversionuid屬性。這個元素是必需的,因為recursivetask類的父類forkjointask實作了serializable接口。

3.聲明一個名為path的私有string屬性,用來存儲任務将要處理的檔案夾的完整路徑。

<code>private</code> <code>string path;</code>

4.聲明一個名為extension的私有string屬性,用來存儲任務将要查找的檔案的擴充名。

<code>private</code> <code>string extension;</code>

5.實作類的構造器,用來初始化這些屬性。

<code>public</code> <code>folderprocessor (string path, string extension) {</code>

<code>this</code><code>.path=path;</code>

<code>this</code><code>.extension=extension;</code>

6.實作compute()方法。既然指定了recursivetask類泛型參數為list&lt;string&gt;類型,那麼,這個方法必須傳回一個同樣類型的對象。

<code>protected</code> <code>list&lt;string&gt; compute() {</code>

7.聲明一個名為list的string對象清單,用來存儲檔案夾中檔案的名稱。

<code>list&lt;string&gt; list=</code><code>new</code> <code>arraylist&lt;&gt;();</code>

8.聲明一個名為tasks的folderprocessor任務清單,用來存儲子任務,這些子任務将處理檔案夾中的子檔案夾。

<code>list&lt;folderprocessor&gt; tasks=</code><code>new</code> <code>arraylist&lt;&gt;();</code>

9.擷取檔案夾的内容。

<code>file file=</code><code>new</code> <code>file(path);</code>

<code>file content[] = file.listfiles();</code>

10.對于檔案夾中的每一個元素,如果它是子檔案夾,就建立一個新的folderprocessor對象,然後調用fork()方法采用異步方式來執行它。

<code>if</code> <code>(content !=</code><code>null</code><code>) {</code>

<code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; content.length; i++) {</code>

<code>if</code> <code>(content[i].isdirectory()) {</code>

<code>folderprocessor task=</code><code>new</code> <code>folderprocessor(content[i].</code>

<code>getabsolutepath(), extension);</code>

<code>task.fork();</code>

11.否則,調用checkfile()方法來比較檔案的擴充名。如果檔案的擴充名與将要搜尋的擴充名相同,就将檔案的完整路徑存儲到第7步聲明的清單中。

<code>if</code> <code>(checkfile(content[i].getname())){</code>

<code>list.add(content[i].getabsolutepath());</code>

12.如果folderprocessor子任務清單超過50個元素,那麼就在控制台輸出一條資訊表示這種情景。

<code>if</code> <code>(tasks.size()&gt;</code><code>50</code><code>) {</code>

<code>system.out.printf(</code><code>"%s: %d tasks ran.\n"</code><code>,file.</code>

<code>getabsolutepath(),tasks.size());</code>

13.調用addresultsfromtask()輔助方法。它把通過這個任務而啟動的子任務傳回的結果增加到檔案清單中。傳遞兩個參數給這個方法,一個是字元串清單list,一個是folderprocessor子任務清單tasks。

<code>addresultsfromtasks(list,tasks);</code>

14.傳回字元串清單。

<code>return</code> <code>list;</code>

15.實作addresultsfromtasks()方法。周遊任務清單中存儲的每一個任務,調用join()方法等待任務執行結束,并且傳回任務的結果。然後,調用addall()方法将任務的結果增加到字元串清單中。

<code>private</code> <code>void</code> <code>addresultsfromtasks(list&lt;string&gt; list,</code>

<code>list&lt;folderprocessor&gt; tasks) {</code>

<code>for</code> <code>(folderprocessor item: tasks) {</code>

<code>list.addall(item.join());</code>

16.實作checkfile()方法。這個方法檢查作為參數而傳遞進來的檔案名,如果是以正在搜尋的檔案擴充名為結尾,那麼方法就傳回true,否則就傳回false。

<code>private</code> <code>boolean</code> <code>checkfile(string name) {</code>

<code>return</code> <code>name.endswith(extension);</code>

17.實作範例的主類,建立main主類,并實作main()方法。

18.通過預設的構造器建立forkjoinpool線程池。

19.建立3個folderprocessor任務,并使用不同的檔案夾路徑來初始化這些任務。

<code>folderprocessor system=</code><code>new</code> <code>folderprocessor(</code><code>"c:\\windows"</code><code>,</code>

<code>"log"</code><code>);</code>

<code>folderprocessor apps=</code><code>new</code>

<code>folderprocessor(</code><code>"c:\\program files"</code><code>,</code><code>"log"</code><code>);</code>

<code>folderprocessor documents=</code><code>new</code> <code>folderprocessor("c:\\documents</code>

<code>and settings</code><code>","</code><code>log");</code>

20.調用execute()方法執行線程池裡的3個任務。

<code>pool.execute(system);</code>

<code>pool.execute(apps);</code>

<code>pool.execute(documents);</code>

21.在控制台上每隔1秒鐘輸出線程池的狀态資訊,直到這3個任務執行結束。

<code>}</code><code>while</code> <code>((!system.isdone())||(!apps.isdone())||(!documents.</code>

<code>38</code>

<code>39</code>

<code>isdone()));</code>

22.調用shutdown()方法關閉forkjoinpool線程池。

23.在控制台輸出每一個任務産生的結果的大小。

<code>list&lt;string&gt; results;</code>

<code>results=system.join();</code>

<code>system.out.printf(</code><code>"system: %d files found.\n"</code><code>,results.size());</code>

<code>results=apps.join();</code>

<code>system.out.printf(</code><code>"apps: %d files found.\n"</code><code>,results.size());</code>

<code>results=documents.join();</code>

<code>system.out.printf(</code><code>"documents: %d files found.\n"</code><code>,results.</code>

<code>size());</code>

下面的截圖顯示了範例的部分運作結果。

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構

這個範例的重點在于folderprocessor類。每一個任務處理一個檔案夾中的内容。檔案夾中的内容有以下兩種類型的元素:

檔案;

其他檔案夾。

如果主任務發現一個檔案夾,它将建立另一個task對象來處理這個檔案夾,調用fork()方法把這個新對象發送到線程池中。fork()方法發送任務到線程池時,如果線程池中有空閑的工作者線程(workerthread)或者将建立一個新的線程,那麼開始執行這個任務,fork()方法會立即傳回,是以,主任務可以繼續處理檔案夾裡的其他内容。對于每一個檔案,任務開始比較它的檔案擴充名,如果與要搜尋的擴充名相同,那麼将檔案的完整路徑增加到結果清單中。

一旦主任務處理完指定檔案夾裡的所有内容,它将調用join()方法等待發送到線程池中的所有子任務執行完成。join()方法在主任務中被調用,然後等待任務執行結束,并通過compute()方法傳回值。主任務将所有的子任務結果進行合并,這些子任務發送到線程池中時帶有自己的結果清單,然後通過調用compute()方法傳回這個清單并作為主任務的傳回值。

forkjoinpool類也允許以異步的方式執行任務。調用execute()方法發送3個初始任務到線程池中。在main主類中,調用shutdown()方法結束線程池,并在控制台輸出線程池中任務的狀态及其變化的過程。forkjoinpool類包含了多個方法可以實作這個目的。參考8.5節來查閱這些方法的詳細清單。

本範例使用join()方法來等待任務的結束,然後擷取它們的結果。也可以使用get()方法以下的兩個版本來完成這個目的。

get():如果forkjointask類執行結束,或者一直等到結束,那麼get()方法的這個版本則傳回由compute()方法傳回的結果。

get(long timeout, timeunit unit):如果任務的結果未準備好,那麼get()方法的  這個版本将等待指定的時間。如果超過指定的時間了,任務的結果仍未準備好,那麼這    個方法将傳回 null值。timeunit是一個枚舉類,有如下的常量:days、hours、microseconds、milliseconds、minutes、nanoseconds和seconds。

get()方法和join()方法還存在兩個主要的差別:

join()方法不能被中斷,如果中斷調用join()方法的線程,方法将抛出interruptedexception異常;

如果任務抛出任何運作時異常,那麼 get()方法将傳回executionexception異常,但是join()方法将傳回runtimeexception異常。

參考5.2節。

參考8.5節。

java有兩種類型的異常。

非運作時異常(checked exception):這些異常必須在方法上通過throws子句抛出,或者在方法體内通過try{…}catch{…}方式進行捕捉處理。比如ioexception或classnotfoundexception異常。

運作時異常(unchecked exception):這些異常不需要在方法上通過throws子句抛出,也不需要在方法體内通過try{…}catch{…}方式進行捕捉處理。比如numberformatexception異常。

不能在forkjointask類的compute()方法中抛出任務非運作時異常,因為這個方法的實作沒有包含任何throws聲明。是以,需要包含必需的代碼來處理相關的異常。另一方面,compute()方法可以抛出運作時異常(它可以是任何方法或者方法内的對象抛出的異常)。forkjointask類和forkjoinpool類的行為與我們期待的可能不同。在控制台上,程式沒有結束執行,不能看到任務異常資訊。如果異常不被抛出,那麼它隻是簡單地将異常吞噬掉。然而,我們能夠利用forkjointask類的一些方法來獲知任務是否有異常抛出,以及抛出哪一種類型的異常。在本節,我們将學習如何擷取這些異常資訊。

本節的範例是在eclipse ide裡完成的。無論你使用eclipse還是其他的ide(比如netbeans),都可以打開這個ide并且建立一個新的java工程。

1.建立名為task的類,并繼承recursivetask類,recursivetask類的泛型參數為integer 類型。

<code>public</code> <code>class</code> <code>task</code><code>extends</code> <code>recursivetask&lt;integer&gt; {</code>

2.聲明一個名為array的私有int數組。用來模拟在這個範例中即将處理的資料數組。

<code>private</code> <code>int</code> <code>array[];</code>

3.聲明兩個分别名為start和end的私有int屬性。這些屬性将決定任務要處理的數組元素。

4.實作類的構造器,用來初始化類的屬性。

<code>public</code> <code>task(</code><code>int</code> <code>array[],</code><code>int</code> <code>start,</code><code>int</code> <code>end){</code>

<code>this</code><code>.array=array;</code>

5.實作任務的compute()方法。由于指定了integer類型作為recursivetask的泛型類型,是以這個方法必須傳回一個integer對象。在控制台輸出start和end屬性。

<code>system.out.printf(</code><code>"task: start from %d to %d\n"</code><code>,start,end);</code>

6.如果由start和end屬性所決定的元素塊規模小于10,那麼直接檢查元素,當碰到元素塊的第4個元素(索引位為3)時,就抛出runtimeexception異常。然後将任務休眠1秒鐘。

<code>if</code> <code>(end-start&lt;</code><code>10</code><code>) {</code>

<code>if</code> <code>((</code><code>3</code><code>&gt;start)&amp;&amp;(</code><code>3</code><code>&lt;end)){</code>

<code>throw</code> <code>new</code> <code>runtimeexception(</code><code>"this task throws an"</code><code>+</code>

<code>"exception: task from "</code><code>+start+</code><code>" to "</code><code>+end);</code>

7.如果要處理的元素塊規模大于或等于10,就拆分這個元素塊為兩部分,并建立    兩個task對象來處理這兩部分,然後調用invokeall()方法線上程池中執行這兩個task對象。

<code>int</code> <code>mid=(end+start)/</code><code>2</code><code>;</code>

<code>task task1=</code><code>new</code> <code>task(array,start,mid);</code>

<code>task task2=</code><code>new</code> <code>task(array,mid,end);</code>

8.在控制台輸出資訊,表示任務結束,并輸出start和end屬性值。

<code>system.out.printf(</code><code>"task: end form %d to %d\n"</code><code>,start,end);</code>

9.傳回數字0作為任務的結果。

<code>return</code> <code>0</code><code>;</code>

10.實作範例的主類,建立main主類,并實作main()方法。

11.建立一個名為array并能容納100個整數的int數組。

<code>int</code> <code>array[]=</code><code>new</code> <code>int</code><code>[</code><code>100</code><code>];</code>

12.建立一個task對象來處理這個數組。

<code>task task=</code><code>new</code> <code>task(array,</code><code>0</code><code>,</code><code>100</code><code>);</code>

13.通過預設的構造器建立forkjoinpool對象。

14.調用execute()方法線上程池中執行任務。

15.調用shutdown()方法關閉線程池。

16. 調用awaittermination()方法等待任務執行結束。如果想一直等待到任務執行完成,那就傳遞值1和timeunit.days作為參數給這個方法。

17. 調用iscompletedabnormally()方法來檢查主任務或者它的子任務之一是否抛出了異常。在這個示例中,在控制台輸出資訊就表示有異常抛出。調用forkjointask類的getexception()方法來擷取異常資訊。

<code>if</code> <code>(task.iscompletedabnormally()) {</code>

<code>system.out.printf(</code><code>"main: an exception has ocurred\n"</code><code>);</code>

<code>system.out.printf(</code><code>"main: %s\n"</code><code>,task.getexception());</code>

<code>system.out.printf(</code><code>"main: result: %d"</code><code>,task.join());</code>

在本節,我們實作的task類用來處理一個數字數組。它檢查要處理的數字塊規模是否包含有10個或更多個元素。在這個情況下,task類拆分這個數字塊為兩部分,然後建立兩個新的task對象用來處理這兩部分。否則,它将尋找位于數組中第4個位置(索引位為3)的元素。如果這個元素位于任務處理塊中,它将抛出一個runtimeexception異常。

雖然運作這個程式時将抛出異常,但是程式不會停止。在main主類中,調用原始任務forkjointask類的iscompletedabnormally()方法,如果主任務或者它的子任務之一抛出了異常,這個方法将傳回true。也可以使用getexception()方法來獲得抛出的exception對象。

當任務抛出運作時異常時,會影響它的父任務(發送到forkjoinpool類的任務),以及父任務的父任務,以此類推。查閱程式的輸出結果,将會發現有一些任務沒有結束的資訊。那些任務的開始資訊如下:

<code>task: starting form</code><code>0</code> <code>to</code><code>100</code>

<code>task: starting form</code><code>0</code> <code>to</code><code>50</code>

<code>task: starting form</code><code>0</code> <code>to</code><code>25</code>

<code>task: starting form</code><code>0</code> <code>to</code><code>12</code>

<code>task: starting form</code><code>0</code> <code>to</code><code>6</code>

這些任務是那些抛出異常的任務和它的父任務。所有這些任務都是異常結束的。記住一點:在用forkjoinpool對象和forkjointask對象開發一個程式時,它們是會抛出異常的,如果不想要這種行為,就得采用其他方式。

下面的截屏展示了這個範例運作的部分結果。

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構

在範例中,不采用抛出異常,而調用forkjointask類的completeexceptionally()方法也可以獲得同樣的結果。代碼如下所示:

<code>exception e=</code><code>new</code> <code>exception(</code><code>"this task throws an exception: "</code><code>+ "task</code>

<code>from</code><code>"+start+"</code> <code>to "+end);</code>

<code>completeexceptionally(e);</code>

在forkjoinpool類中執行forkjointask對象時,在任務開始執行前可以取消它。forkjointask類提供了cancel()方法來達到取消任務的目的。在取消一個任務時必須要注意以下兩點:

forkjoinpool類不提供任何方法來取消線程池中正在運作或者等待運作的所有任務;

取消任務時,不能取消已經被執行的任務。

在本節,我們将實作一個取消forkjointask對象的範例。該範例将尋找數組中某個數字所處的位置。第一個任務是尋找可以被取消的剩餘任務數。由于fork/join架構沒有提供取消功能,我們将建立一個輔助類來實作取消任務的操作。

1.建立一個名為arraygenerator的類。這個類将生成一個指定大小的随機整數數組。實作generatearray()方法,它将生成數字數組,接收一個int參數表示數組的大小。

<code>public</code> <code>class</code> <code>arraygenerator {</code>

<code>public</code> <code>int</code><code>[] generatearray(</code><code>int</code> <code>size) {</code>

<code>int</code> <code>array[]=</code><code>new</code> <code>int</code><code>[size];</code>

<code>array[i]=random.nextint(</code><code>10</code><code>);</code>

<code>return</code> <code>array;</code>

2.建立一個名為taskmanager的類。本示例将利用這個類來存儲在forkjoinpool中執行的任務。由于forkjoinpool和forkjointask類的局限性,将利用taskmanager類來取消forkjoinpool類中所有的任務。

<code>public</code> <code>class</code> <code>taskmanager {</code>

3.聲明一個名為tasks的對象清單,帶有forkjointask泛型參數,并且forkjointask帶有integer泛型參數。

<code>private</code> <code>list&lt;forkjointask&lt;integer&gt;&gt; tasks;</code>

4.實作類的構造器,用來初始化任務清單。

<code>public</code> <code>taskmanager(){</code>

<code>tasks=</code><code>new</code> <code>arraylist&lt;&gt;();</code>

5.實作addtask()方法。增加一個forkjointask對象到任務清單中。

<code>public</code> <code>void</code> <code>addtask(forkjointask&lt;integer&gt; task){</code>

6.實作canceltasks()方法。周遊存儲在清單中的所有forkjointask對象,然後調用cancel()方法取消之。canceltasks()方法接收一個要取消剩餘任務的forkjointask對象作為參數,然後取消所有的任務。

<code>public</code> <code>void</code> <code>canceltasks(forkjointask&lt;integer&gt; canceltask){</code>

<code>for</code> <code>(forkjointask&lt;integer&gt; task :tasks) {</code>

<code>if</code> <code>(task!=canceltask) {</code>

<code>task.cancel(</code><code>true</code><code>);</code>

<code>((searchnumbertask)task).writecancelmessage();</code>

7.實作searchnumbertask類,并繼承recursivetask類,recursivetask類的泛型參數為integer類型。這個類将尋找在整數數組元素塊中的一個數字。

<code>public</code> <code>class</code> <code>searchnumbertask</code><code>extends</code> <code>recursivetask&lt;integer&gt; {</code>

8.聲明一個名為array的私有int數組。

<code>private</code> <code>int</code> <code>numbers[];</code>

9.聲明兩個分别名為start和end的私有int屬性。這兩個屬性将決定任務所要處理的數組的元素。

10.聲明一個名為number的私有int屬性,用來存儲将要尋找的數字。

<code>private</code> <code>int</code> <code>number;</code>

11.聲明一個名為manager的私有taskmanager屬性。利用這個對象來取消所有的任務。

<code>private</code> <code>taskmanager manager;</code>

12.聲明一個int常量,并初始化其值為-1。當任務找不到數字時将傳回這個常量。

<code>private</code> <code>final</code> <code>static</code> <code>int</code> <code>not_found=-</code><code>1</code><code>;</code>

13.實作類的構造器,用來初始化它的屬性。

<code>public</code> <code>task (</code><code>int</code> <code>numbers[],</code><code>int</code> <code>start,</code><code>int</code> <code>end,</code><code>int</code> <code>number,</code>

<code>taskmanager manager){</code>

<code>this</code><code>.numbers=numbers;</code>

<code>this</code><code>.number=number;</code>

<code>this</code><code>.manager=manager;</code>

14.實作compute()方法。在控制台輸出資訊表示任務開始,并輸出start和end的屬性值。

<code>system.out.println(</code><code>"task: "</code><code>+start+</code><code>":"</code><code>+end);</code>

15.如果start和end屬性值的差異大于10(任務必須處理大于10個元素的數組),那麼,就調用launchtasks()方法将這個任務拆分為兩個子任務。

<code>int</code> <code>ret;</code>

<code>if</code> <code>(end-start&gt;</code><code>10</code><code>) {</code>

<code>ret=launchtasks();</code>

16.否則,尋找在數組塊中的數字,調用lookfornumber()方法處理這個任務。

<code>ret=lookfornumber();</code>

17.傳回任務的結果。

18.實作lookfornumber()方法。

<code>private</code> <code>int</code> <code>lookfornumber() {</code>

19.周遊任務所要處理的數組塊中的所有元素,将元素中存儲的數字和将要尋找的數字進行比較。如果它們相等,就在控制台輸出資訊表示找到了,并用taskmanager對象的canceltasks()方法取消所有的任務,然後傳回已找到的這個元素所在的位置。

<code>if</code> <code>(numbers[i]==number) {</code>

<code>system.out.printf("task: number %d found in position</code>

<code>%d\n",number,i);</code>

<code>manager.canceltasks(</code><code>this</code><code>);</code>

<code>return</code> <code>i;</code>

20.在循環體中,将任務休眠1秒鐘。

21.傳回-1表示沒有找到元素。

<code>return</code> <code>not_found;</code>

22.實作launchtasks()方法。将這個任務要處理的元素塊拆分成兩部分,然後建立兩個task對象來處理它們。

<code>private</code> <code>int</code> <code>launchtasks() {</code>

<code>task task1=</code><code>new</code> <code>task(array,start,mid,number,manager);</code>

<code>task task2=</code><code>new</code> <code>task(array,mid,end,number,manager);</code>

23.增加任務到taskmanager對象中。

<code>manager.addtask(task1);</code>

<code>manager.addtask(task2);</code>

24.調用fork()方法采用異步方式執行這兩個任務。

<code>task1.fork();</code>

<code>task2.fork();</code>

25.等待任務結束,如果第一個任務傳回的結果不為-1,則傳回第一個任務的結果;否則傳回第二個任務的結果。

<code>int</code> <code>returnvalue;</code>

<code>returnvalue=task1.join();</code>

<code>if</code> <code>(returnvalue!=-</code><code>1</code><code>) {</code>

<code>return</code> <code>returnvalue;</code>

<code>returnvalue=task2.join();</code>

26.實作writecancelmessage()方法,在控制台輸入資訊表示任務已經取消了。

<code>public</code> <code>void</code> <code>writecancelmessage(){</code>

<code>system.out.printf("task: cancelled task from %d to</code>

<code>%d",start,end);</code>

27.實作範例的主類,建立main主類,并實作main()方法。

28.用arraygenerator類建立一個容量為1,000的數字數組。

<code>arraygenerator generator=</code><code>new</code> <code>arraygenerator();</code>

<code>int</code> <code>array[]=generator.generatearray(</code><code>1000</code><code>);</code>

29.建立一個taskmanager對象。

<code>taskmanager manager=</code><code>new</code> <code>taskmanager();</code>

30.通過預設的構造器建立一個forkjoinpool對象。

31.建立一個task對象用來處理第28步生成的數組。

<code>task task=</code><code>new</code> <code>task (array,</code><code>0</code><code>,</code><code>1000</code><code>,</code><code>5</code><code>,manager);</code>

32.調用execute()方法采用異步方式執行線程池中的任務。

33.調用shutdown()方法關閉線程池。

34.調用forkjoinpool類的awaittermination()方法等待任務執行結束。

35.在控制台輸出資訊表示程式結束。

<code>system.out.printf(</code><code>"main: the program has finished\n"</code><code>);</code>

forkjointask類提供的cancel()方法允許取消一個仍沒有被執行的任務,這是非常重要的一點。如果任務已經開始執行,那麼調用cancel()方法也無法取消。這個方法接收一個名為mayinterruptifrunning的boolean值參數。顧名思義,如果傳遞true值給這個方法,即使任務正在運作也将被取消。javaapi文檔指出,forkjointask類的預設實作,這個屬性沒有起作用。如果任務還沒有開始執行,那麼這些任務将被取消。任務的取消對于已發送到線程池中的任務沒有影響,它們将繼續執行。

fork/join架構的局限性在于,forkjoinpool線程池中的任務不允許被取消。為了克服這種局限性,我們實作了taskmanager類,它存儲發送到線程池中的所有任務,可以用一個方法來取消存儲的所有任務。如果任務正在運作或者已經執行結束,那麼任務就不能被取消,cancel()方法傳回false值,是以可以嘗試去取消所有的任務而不用擔心可能帶來的間接影響。

這個範例實作在數字數組中尋找一個數字。根據fork/join架構的推薦,我們将問題拆分為更小的子問題。由于我們僅關心數字的一次出現,是以,當找到它時,就會取消其他的所有任務。

下面的截圖展示了範例執行的部分結果。

《Java 7并發程式設計實戰手冊》第五章Fork/Join架構