實作threadfactory接口生成自定義的線程給fork/join架構
fork/join架構是java7中最有趣的特征之一。它是executor和executorservice接口的一個實作,允許你執行callable和runnable任務而不用管理這些執行線程。
這個執行者面向執行能被拆分成更小部分的任務。主要元件如下:
一個特殊任務,實作forkjointask類
兩種操作,将任務劃分成子任務的fork操作和等待這些子任務結束的join操作
一個算法,優化池中線程的使用的work-stealing算法。當一個任務正在等待它的子任務(結束)時,它的執行線程将執行其他任務(等待執行的任務)。
forkjoinpool類是fork/join的主要類。在它的内部實作,有如下兩種元素:
一個存儲等待執行任務的列隊。
一個執行任務的線程池
在這個指南中,你将學習如何實作一個在forkjoinpool類中使用的自定義的工作者線程,及如何使用一個工廠來使用它。
準備工作…
這個指南的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。
如何做…
按以下步驟來實作的這個例子:
1.建立一個繼承forkjoinworkerthread類的myworkerthread類。
<code>1</code>
<code>public</code> <code>class</code> <code>myworkerthread</code><code>extends</code> <code>forkjoinworkerthread {</code>
2.聲明和建立一個參數化為integer類的threadlocal屬性,名為taskcounter。
<code>private</code> <code>static</code> <code>threadlocal<integer> taskcounter=</code><code>new</code> <code>threadlocal<integer>();</code>
3.實作這個類的構造器。
<code>protected</code> <code>myworkerthread(forkjoinpool pool) {</code>
<code>2</code>
<code>super</code><code>(pool);</code>
<code>3</code>
<code>}</code>
4.重寫onstart()方法。調用父類的這個方法,寫入一條資訊到控制台。設定目前線程的taskcounter屬性值為0。
<code>@override</code>
<code>protected</code> <code>void</code> <code>onstart() {</code>
<code>super</code><code>.onstart();</code>
<code>4</code>
<code>system.out.printf("myworkerthread %d: initializing task</code>
<code>5</code>
<code>counter.\n",getid());</code>
<code>6</code>
<code>taskcounter.set(</code><code>0</code><code>);</code>
<code>7</code>
5.重寫ontermination()方法。寫入目前線程的taskcounter屬性值到控制台。
<code>protected</code> <code>void</code> <code>ontermination(throwable exception) {</code>
<code>system.out.printf("myworkerthread %d:</code>
<code>%d\n",getid(),taskcounter.get());</code>
<code>super</code><code>.ontermination(exception);</code>
6.實作addtask()方法。遞增taskcounter屬性值。
<code>public</code> <code>void</code> <code>addtask(){</code>
<code>int</code> <code>counter=taskcounter.get().intvalue();</code>
<code>counter++;</code>
<code>taskcounter.set(counter);</code>
7.建立一個實作forkjoinworkerthreadfactory接口的myworkerthreadfactory類。實作newthread()方法,建立和傳回一個myworkerthread對象。
<code>public</code> <code>forkjoinworkerthread newthread(forkjoinpool pool) {</code>
<code>return</code> <code>new</code> <code>myworkerthread(pool);</code>
8.建立myrecursivetask類,它繼承一個參數化為integer類的recursivetask類。
<code>public</code> <code>class</code> <code>myrecursivetask</code><code>extends</code> <code>recursivetask<integer> {</code>
9.聲明一個私有的、int類型的屬性array。
<code>private</code> <code>int</code> <code>array[];</code>
10.聲明兩個私有的、int類型的屬性start和end。
<code>private</code> <code>int</code> <code>start, end;</code>
11.實作這個類的構造器,初始化它的屬性。
<code>public</code> <code>myrecursivetask(</code><code>int</code> <code>array[],</code><code>int</code> <code>start,</code><code>int</code> <code>end) {</code>
<code>this</code><code>.array=array;</code>
<code>this</code><code>.start=start;</code>
<code>this</code><code>.end=end;</code>
12.實作compute()方法,用來合計數組中在start和end位置之間的所有元素。首先,将執行這個任務的線程轉換成一個myworkerthread對象,然後使用addtask()方法來增長這個線程的任務計數器。
<code>protected</code> <code>integer compute() {</code>
<code>integer ret;</code>
<code>myworkerthread thread=(myworkerthread)thread.currentthread();</code>
<code>thread.addtask();</code>
13.實作addresults()方法。計算和傳回兩個任務(接收參數)的結果的總和。
<code>01</code>
<code>private</code> <code>integer addresults(task task1, task task2) {</code>
<code>02</code>
<code>int</code> <code>value;</code>
<code>03</code>
<code>try</code> <code>{</code>
<code>04</code>
<code>value = task1.get().intvalue()+task2.get().intvalue();</code>
<code>05</code>
<code>}</code><code>catch</code> <code>(interruptedexception e) {</code>
<code>06</code>
<code>e.printstacktrace();</code>
<code>07</code>
<code>value=</code><code>0</code><code>;</code>
<code>08</code>
<code>}</code><code>catch</code> <code>(executionexception e) {</code>
<code>09</code>
<code>10</code>
<code>11</code>
14.令這個線程睡眠10毫秒,然後傳回任務的結果。
<code>timeunit.milliseconds.sleep(</code><code>10</code><code>);</code>
<code>return</code> <code>value;</code>
15.實作這個例子的主類,通過建立main類,并實作main()方法。
<code>public</code> <code>class</code> <code>main {</code>
<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>exception {</code>
16.建立一個名為factory的myworkerthreadfactory對象。
<code>myworkerthreadfactory factory=</code><code>new</code> <code>myworkerthreadfactory();</code>
17.建立一個名為pool的forkjoinpool對象,将前面建立的factory對象作為參數傳給它的構造器。
<code>forkjoinpool pool=</code><code>new</code> <code>forkjoinpool(</code><code>4</code><code>, factory,</code><code>null</code><code>,</code><code>false</code><code>);</code>
18.建立一個大小為100000的整數數組,将所有元素初始化為值1。
<code>int</code> <code>array[]=</code><code>new</code> <code>int</code><code>[</code><code>100000</code><code>];</code>
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i<array.length; i++){</code>
<code>array[i]=</code><code>1</code><code>;</code>
19.建立一個新的task對象,用來合計數組中的所有元素。
<code>myrecursivetask task=</code><code>new</code> <code>myrecursivetask(array,</code><code>0</code><code>,array.length);</code>
20.使用execute()方法,将這個任務送出給池。
<code>pool.execute(task);</code>
21.使用join()方法,等待這個任務的結束。
<code>task.join();</code>
22.使用shutdown()方法,關閉這個池。
<code>pool.shutdown();</code>
23.使用awaittermination()方法,等待這個執行者的結束。
<code>pool.awaittermination(</code><code>1</code><code>, timeunit.days);</code>
24.使用get()方法,将任務的結束寫入到控制台。
<code>system.out.printf(</code><code>"main: result: %d\n"</code><code>,task.get());</code>
25.寫入一條資訊到控制台,表明程式的結束。
<code>system.out.printf(</code><code>"main: end of the program\n"</code><code>);</code>
它是如何工作的…
fork/join架構使用的線程叫工作者線程。java包含繼承thread類的forkjoinworkerthread類和使用fork/join架構實作工作者線程。
在這個指南中,你已實作了繼承forkjoinworkerthread類的myworkerthread類,并重寫這個類的兩個方法。你的目标是實作每個工作者線程的任務計數器,以至于你可以知道每個工作者線程執行多少個任務。你已經通過一個threadlocal屬性實作計數器。這樣,每個線程都擁有它自己的計數器,對于來你說是透明的。
你已重寫forkjoinworkerthread類的onstart()方法來實作任務的計數器。當工作者線程開始它的執行時,這個方法将被調用。你也重寫了ontermination()方法,将任務計數器的值寫入到控制台。當工作者線程結束它的執行時,這個方法将被調用。你也在myworkerthread類中實作addtask()方法,用來增加每個線程的任務計數器。
對于forkjoinpool類,與java并發api中的所有執行者一樣,使用工廠來建立它。是以,如果你想在forkjoinpool類中使用myworkerthread線程,你必須實作自己的線程工廠。對于fork/join架構,這個工廠必須實作forkjoinpool.forkjoinworkerthreadfactory類。為此,你已實作myworkerthreadfactory類。這個類隻有一個用來建立一個新的myworkerthread對象的方法。
最後,你隻要使用已建立的工廠來初始化forkjoinpool類。你已在main類中通過使用forkjoinpool的構造器實作了。
以下截圖顯示了這個程式的部分輸出:

你可以看出forkjoinpool對象如何執行4個工作者線程及每個工作者線程執行多少個任務。
不止這些…
考慮一下,當一個線程正常結束或抛出一個exception異常時,調用的forkjoinworkerthread提供的ontermination()方法。這個方法接收一個throwable對象作為參數。如果這個參數值為null時,表明這個工作者線程正常結束。但是,如果這個參數的值不為null,表明這個線程抛出一個異常。你必須包含必要的代碼來處理這種情況。