天天看點

定制并發類(七)實作ThreadFactory接口生成自定義的線程給Fork/Join架構

實作threadfactory接口生成自定義的線程給fork/join架構

fork/join架構是java7中最有趣的特征之一。它是executor和executorservice接口的一個實作,允許你執行callable和runnable任務而不用管理這些執行線程。

這個執行者面向執行能被拆分成更小部分的任務。主要元件如下:

一個特殊任務,實作forkjointask類

兩種操作,将任務劃分成子任務的fork操作和等待這些子任務結束的join操作

一個算法,優化池中線程的使用的work-stealing算法。當一個任務正在等待它的子任務(結束)時,它的執行線程将執行其他任務(等待執行的任務)。

forkjoinpool類是fork/join的主要類。在它的内部實作,有如下兩種元素:

一個存儲等待執行任務的列隊。

一個執行任務的線程池

在這個指南中,你将學習如何實作一個在forkjoinpool類中使用的自定義的工作者線程,及如何使用一個工廠來使用它。

準備工作…

這個指南的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。

如何做…

按以下步驟來實作的這個例子:

1.建立一個繼承forkjoinworkerthread類的myworkerthread類。

<code>1</code>

<code>public</code> <code>class</code> <code>myworkerthread</code><code>extends</code> <code>forkjoinworkerthread {</code>

2.聲明和建立一個參數化為integer類的threadlocal屬性,名為taskcounter。

<code>private</code> <code>static</code> <code>threadlocal&lt;integer&gt; taskcounter=</code><code>new</code> <code>threadlocal&lt;integer&gt;();</code>

3.實作這個類的構造器。

<code>protected</code> <code>myworkerthread(forkjoinpool pool) {</code>

<code>2</code>

<code>super</code><code>(pool);</code>

<code>3</code>

<code>}</code>

4.重寫onstart()方法。調用父類的這個方法,寫入一條資訊到控制台。設定目前線程的taskcounter屬性值為0。

<code>@override</code>

<code>protected</code> <code>void</code> <code>onstart() {</code>

<code>super</code><code>.onstart();</code>

<code>4</code>

<code>system.out.printf("myworkerthread %d: initializing task</code>

<code>5</code>

<code>counter.\n",getid());</code>

<code>6</code>

<code>taskcounter.set(</code><code>0</code><code>);</code>

<code>7</code>

5.重寫ontermination()方法。寫入目前線程的taskcounter屬性值到控制台。

<code>protected</code> <code>void</code> <code>ontermination(throwable exception) {</code>

<code>system.out.printf("myworkerthread %d:</code>

<code>%d\n",getid(),taskcounter.get());</code>

<code>super</code><code>.ontermination(exception);</code>

6.實作addtask()方法。遞增taskcounter屬性值。

<code>public</code> <code>void</code> <code>addtask(){</code>

<code>int</code> <code>counter=taskcounter.get().intvalue();</code>

<code>counter++;</code>

<code>taskcounter.set(counter);</code>

7.建立一個實作forkjoinworkerthreadfactory接口的myworkerthreadfactory類。實作newthread()方法,建立和傳回一個myworkerthread對象。

<code>public</code> <code>forkjoinworkerthread newthread(forkjoinpool pool) {</code>

<code>return</code> <code>new</code> <code>myworkerthread(pool);</code>

8.建立myrecursivetask類,它繼承一個參數化為integer類的recursivetask類。

<code>public</code> <code>class</code> <code>myrecursivetask</code><code>extends</code> <code>recursivetask&lt;integer&gt; {</code>

9.聲明一個私有的、int類型的屬性array。

<code>private</code> <code>int</code> <code>array[];</code>

10.聲明兩個私有的、int類型的屬性start和end。

<code>private</code> <code>int</code> <code>start, end;</code>

11.實作這個類的構造器,初始化它的屬性。

<code>public</code> <code>myrecursivetask(</code><code>int</code> <code>array[],</code><code>int</code> <code>start,</code><code>int</code> <code>end) {</code>

<code>this</code><code>.array=array;</code>

<code>this</code><code>.start=start;</code>

<code>this</code><code>.end=end;</code>

12.實作compute()方法,用來合計數組中在start和end位置之間的所有元素。首先,将執行這個任務的線程轉換成一個myworkerthread對象,然後使用addtask()方法來增長這個線程的任務計數器。

<code>protected</code> <code>integer compute() {</code>

<code>integer ret;</code>

<code>myworkerthread thread=(myworkerthread)thread.currentthread();</code>

<code>thread.addtask();</code>

13.實作addresults()方法。計算和傳回兩個任務(接收參數)的結果的總和。

<code>01</code>

<code>private</code> <code>integer addresults(task task1, task task2) {</code>

<code>02</code>

<code>int</code> <code>value;</code>

<code>03</code>

<code>try</code> <code>{</code>

<code>04</code>

<code>value = task1.get().intvalue()+task2.get().intvalue();</code>

<code>05</code>

<code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>06</code>

<code>e.printstacktrace();</code>

<code>07</code>

<code>value=</code><code>0</code><code>;</code>

<code>08</code>

<code>}</code><code>catch</code> <code>(executionexception e) {</code>

<code>09</code>

<code>10</code>

<code>11</code>

14.令這個線程睡眠10毫秒,然後傳回任務的結果。

<code>timeunit.milliseconds.sleep(</code><code>10</code><code>);</code>

<code>return</code> <code>value;</code>

15.實作這個例子的主類,通過建立main類,并實作main()方法。

<code>public</code> <code>class</code> <code>main {</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>exception {</code>

16.建立一個名為factory的myworkerthreadfactory對象。

<code>myworkerthreadfactory factory=</code><code>new</code> <code>myworkerthreadfactory();</code>

17.建立一個名為pool的forkjoinpool對象,将前面建立的factory對象作為參數傳給它的構造器。

<code>forkjoinpool pool=</code><code>new</code> <code>forkjoinpool(</code><code>4</code><code>, factory,</code><code>null</code><code>,</code><code>false</code><code>);</code>

18.建立一個大小為100000的整數數組,将所有元素初始化為值1。

<code>int</code> <code>array[]=</code><code>new</code> <code>int</code><code>[</code><code>100000</code><code>];</code>

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;array.length; i++){</code>

<code>array[i]=</code><code>1</code><code>;</code>

19.建立一個新的task對象,用來合計數組中的所有元素。

<code>myrecursivetask task=</code><code>new</code> <code>myrecursivetask(array,</code><code>0</code><code>,array.length);</code>

20.使用execute()方法,将這個任務送出給池。

<code>pool.execute(task);</code>

21.使用join()方法,等待這個任務的結束。

<code>task.join();</code>

22.使用shutdown()方法,關閉這個池。

<code>pool.shutdown();</code>

23.使用awaittermination()方法,等待這個執行者的結束。

<code>pool.awaittermination(</code><code>1</code><code>, timeunit.days);</code>

24.使用get()方法,将任務的結束寫入到控制台。

<code>system.out.printf(</code><code>"main: result: %d\n"</code><code>,task.get());</code>

25.寫入一條資訊到控制台,表明程式的結束。

<code>system.out.printf(</code><code>"main: end of the program\n"</code><code>);</code>

它是如何工作的…

fork/join架構使用的線程叫工作者線程。java包含繼承thread類的forkjoinworkerthread類和使用fork/join架構實作工作者線程。

在這個指南中,你已實作了繼承forkjoinworkerthread類的myworkerthread類,并重寫這個類的兩個方法。你的目标是實作每個工作者線程的任務計數器,以至于你可以知道每個工作者線程執行多少個任務。你已經通過一個threadlocal屬性實作計數器。這樣,每個線程都擁有它自己的計數器,對于來你說是透明的。

你已重寫forkjoinworkerthread類的onstart()方法來實作任務的計數器。當工作者線程開始它的執行時,這個方法将被調用。你也重寫了ontermination()方法,将任務計數器的值寫入到控制台。當工作者線程結束它的執行時,這個方法将被調用。你也在myworkerthread類中實作addtask()方法,用來增加每個線程的任務計數器。

對于forkjoinpool類,與java并發api中的所有執行者一樣,使用工廠來建立它。是以,如果你想在forkjoinpool類中使用myworkerthread線程,你必須實作自己的線程工廠。對于fork/join架構,這個工廠必須實作forkjoinpool.forkjoinworkerthreadfactory類。為此,你已實作myworkerthreadfactory類。這個類隻有一個用來建立一個新的myworkerthread對象的方法。

最後,你隻要使用已建立的工廠來初始化forkjoinpool類。你已在main類中通過使用forkjoinpool的構造器實作了。

以下截圖顯示了這個程式的部分輸出:

定制并發類(七)實作ThreadFactory接口生成自定義的線程給Fork/Join架構

你可以看出forkjoinpool對象如何執行4個工作者線程及每個工作者線程執行多少個任務。

不止這些…

考慮一下,當一個線程正常結束或抛出一個exception異常時,調用的forkjoinworkerthread提供的ontermination()方法。這個方法接收一個throwable對象作為參數。如果這個參數值為null時,表明這個工作者線程正常結束。但是,如果這個參數的值不為null,表明這個線程抛出一個異常。你必須包含必要的代碼來處理這種情況。