天天看點

Fork/Join架構(二)建立一個Fork/Join池

建立一個fork/join池

在這個指南中,你将學習如何使用fork/join架構的基本元素。它包括:

建立一個forkjoinpool對象來執行任務。

建立一個forkjoinpool執行的forkjointask類。

你将在這個示例中使用fork/join架構的主要特點,如下:

你将使用預設構造器建立forkjoinpool。

在這個任務中,你将使用java api文檔推薦的結構:

<code>1</code>

<code>if (problem size &lt;</code><code>default</code> <code>size){</code>

<code>2</code>

<code>tasks=divide(task);</code>

<code>3</code>

<code>execute(tasks);</code>

<code>4</code>

<code>}</code><code>else</code> <code>{</code>

<code>5</code>

<code>resolve problem using another algorithm;</code>

<code>6</code>

<code>}</code>

你将以一種同步方式執行任務。當一個任務執行2個或2個以上的子任務時,它将等待它們的結束。通過這種方式 ,正在執行這些任務的線程(工作線程)将會查找其他任務(尚未執行的任務)來執行,充分利用它們的執行時間。

你将要實作的任務将不會傳回任何結果,是以你将使用recursiveaction作為它們實作的基類。

準備工作

這個指南中的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。

如何做…

在這個指南中,你将繼續實作一個任務來修改産品清單的價格。任務最初是負責更新一個隊列中的所有元素。你将會使用10作為參考大小,如果一個任務必須更新超過10個元素,這些元素将被劃分成兩個部分,并建立兩個任務來更新每個部分中的産品的價格。

按以下步驟來實作這個示例:

1.建立類product,将用來存儲産品的名稱和價格。

<code>public</code> <code>class</code> <code>product {</code>

2.聲明一個私有的string類型的屬性name和一個私有的double類型的屬性price。

<code>private</code> <code>string name;</code>

<code>private</code> <code>double</code> <code>price;</code>

3.實作這些方法,用來設定和擷取這兩個屬性的值。

<code>01</code>

<code>public</code> <code>string getname() {</code>

<code>02</code>

<code>return</code> <code>name;</code>

<code>03</code>

<code>04</code>

<code>public</code> <code>void</code> <code>setname(string name) {</code>

<code>05</code>

<code>this</code><code>.name = name;</code>

<code>06</code>

<code>07</code>

<code>public</code> <code>double</code> <code>getprice() {</code>

<code>08</code>

<code>return</code> <code>price;</code>

<code>09</code>

<code>10</code>

<code>public</code> <code>void</code> <code>setprice(</code><code>double</code> <code>price) {</code>

<code>11</code>

<code>12</code>

<code>this</code><code>.price = price;</code>

<code>13</code>

4.建立productlistgenerator類,用來産生随機産品的數列。

<code>public</code> <code>class</code> <code>productlistgenerator {</code>

5.實作generate()方法。它接收一個數列大小 的int類型參數,傳回一個産生産品數列的list&lt;product&gt;對象。

<code>public</code> <code>list&lt;product&gt; generate (</code><code>int</code> <code>size) {</code>

6.建立傳回産品數列的對象。

<code>list&lt;product&gt; ret=</code><code>new</code> <code>arraylist&lt;product&gt;();</code>

7.建立産品隊列。給所有産品賦予相同值。比如,10用來檢查程式是否工作得很好。

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;size; i++){</code>

<code>product product=</code><code>new</code> <code>product();</code>

<code>product.setname(</code><code>"product"</code><code>+i);</code>

<code>product.setprice(</code><code>10</code><code>);</code>

<code>ret.add(product);</code>

<code>7</code>

<code>return</code> <code>ret;</code>

<code>8</code>

8.建立task類,指定它繼承recursiveaction類。

<code>public</code> <code>class</code> <code>task</code><code>extends</code> <code>recursiveaction {</code>

9.聲明類的序列版本uid。這個元素是必需的,因為recursiveaction類的父類forkjointask實作了serializable接口。

<code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>serialversionuid = 1l;</code>

10.聲明一個私有的、list&lt;product&gt;類型的屬性products。

<code>private</code> <code>list&amp;lt;product&amp;gt; products;</code>

11.聲明兩個私有的、int類型的屬性first和last。這些屬性将決定這個任務産品的阻塞過程。

<code>private</code> <code>int</code> <code>first;</code>

<code>private</code> <code>int</code> <code>last;</code>

12.聲明一個私有的、double類型的屬性increment,用來存儲産品價格的增長。

<code>private</code> <code>double</code> <code>increment;</code>

13.實作這個類的構造器,初始化所有屬性。

<code>public</code> <code>task (list&amp;lt;product&amp;gt; products,</code><code>int</code> <code>first,</code><code>int</code> <code>last,</code><code>double</code> <code>increment) {</code>

<code>this</code><code>.products=products;</code>

<code>this</code><code>.first=first;</code>

<code>this</code><code>.last=last;</code>

<code>this</code><code>.increment=increment;</code>

14.實作compute()方法 ,該方法将實作任務的邏輯。

<code>@override</code>

<code>protected</code> <code>void</code> <code>compute() {</code>

15.如果last和first的差小于10(任務隻能更新價格小于10的産品),使用updateprices()方法遞增的設定産品的價格。

<code>if</code> <code>(last-first&lt;</code><code>10</code><code>) {</code>

<code>updateprices();</code>

16.如果last和first的差大于或等于10,則建立兩個新的task對象,一個處理産品的前半部分,另一個處理産品的後半部分,然後在forkjoinpool中,使用invokeall()方法執行它們。

<code>int</code> <code>middle=(last+first)/</code><code>2</code><code>;</code>

<code>system.out.printf("task: pending tasks:</code>

<code>%s\n",getqueuedtaskcount());</code>

<code>task t1=</code><code>new</code> <code>task(products, first,middle+</code><code>1</code><code>, increment);</code>

<code>task t2=</code><code>new</code> <code>task(products, middle+</code><code>1</code><code>,last, increment);</code>

<code>invokeall(t1, t2);</code>

17.實作updateprices()方法。這個方法更新産品隊列中位于first值和last值之間的産品。

<code>private</code> <code>void</code> <code>updateprices() {</code>

<code>for</code> <code>(</code><code>int</code> <code>i=first; i&lt;last; i++){</code>

<code>product product=products.get(i);</code>

<code>product.setprice(product.getprice()*(</code><code>1</code><code>+increment));</code>

18.實作這個示例的主類,通過建立main類,并實作main()方法。

<code>public</code> <code>class</code> <code>main {</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

19.使用productlistgenerator類建立一個包括10000個産品的數列。

<code>productlistgenerator generator=</code><code>new</code> <code>productlistgenerator();</code>

<code>list&lt;product&gt; products=generator.generate(</code><code>10000</code><code>);</code>

20.建立一個新的task對象,用來更新産品隊列中的産品。first參數使用值0,last參數使用值10000(産品數列的大小)。

<code>task task=</code><code>new</code> <code>task(products,</code><code>0</code><code>, products.size(),</code><code>0.20</code><code>);</code>

21.使用無參構造器建立forkjoinpool對象。

<code>forkjoinpool pool=</code><code>new</code> <code>forkjoinpool();</code>

22.在池中使用execute()方法執行這個任務 。

<code>pool.execute(task);</code>

23.實作一個顯示關于每隔5毫秒池中的變化資訊的代碼塊。将池中的一些參數值寫入到控制台,直到任務完成它的執行。

<code>do</code> <code>{</code>

<code>system.out.printf(</code><code>"main: thread count: %d\n"</code><code>,pool.getactivethreadcount());</code>

<code>system.out.printf(</code><code>"main: thread steal: %d\n"</code><code>,pool.getstealcount());</code>

<code>system.out.printf(</code><code>"main: parallelism: %d\n"</code><code>,pool.getparallelism());</code>

<code>try</code> <code>{</code>

<code>timeunit.milliseconds.sleep(</code><code>5</code><code>);</code>

<code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>e.printstacktrace();</code>

<code>}</code><code>while</code> <code>(!task.isdone());</code>

24.使用shutdown()方法關閉這個池。

<code>pool.shutdown();</code>

25.使用iscompletednormally()方法檢查假設任務完成時沒有出錯,在這種情況下,寫入一條資訊到控制台。

<code>if</code> <code>(task.iscompletednormally()){</code>

<code>system.out.printf("main: the process has completed</code>

<code>normally.\n");</code>

26.在增長之後,所有産品的價格應該是12。将價格不是12的所有産品的名稱和價格寫入到控制台,用來檢查它們錯誤地增長它們的價格。

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;products.size(); i++){</code>

<code>if</code> <code>(product.getprice()!=</code><code>12</code><code>) {</code>

<code>system.out.printf(</code><code>"product %s: %f\n"</code><code>,product.getname(),product.getprice());</code>

27.寫入一條資訊到控制台表明程式的結束。

<code>system.out.println(</code><code>"main: end of the program.\n"</code><code>);</code>

它是如何工作的…

在這個示例中,你已經建立一個forkjoinpool對象和一個在池中執行的forkjointask類的子類。為了建立forkjoinpool對象,你已經使用了無參構造器,是以它會以預設的配置來執行。它建立一個線程數等于計算機處理器數的池。當forkjoinpool對象被建立時,這些線程被建立并且在池中等待,直到有任務到達讓它們執行。

由于task類沒有傳回結果,是以它繼承recursiveaction類。在這個指南中,你已經使用了推薦的結構來實作任務。如果這個任務更新超過10産品,它将被分解成兩部分,并建立兩個任務,一個任務執行一部分。你已經在task類中使用first和last屬性,用來了解這個任務要更新的産品隊列的位置範圍。你已經使用first和last屬性,隻複制産品數列一次,而不是為每個任務建立不同的數列。

它調用invokeall()方法,執行每個任務所建立的子任務。這是一個同步調用,這個任務在繼續(可能完成)它的執行之前,必須等待子任務的結束。當任務正在等待它的子任務(結束)時,正在執行它的工作線程執行其他正在等待的任務。在這種行為下,fork/join架構比runnable和callable對象本身提供一種更高效的任務管理。

forkjointask類的invokeall()方法是執行者(executor)和fork/join架構的一個主要差別。在執行者架構中,所有任務被送出給執行者,而在這種情況下,這些任務包括執行和控制這些任務的方法都在池内。你已經在task類中使用invokeall()方法,它是繼承了繼承forkjointask類的recursiveaction類。

你使用execute()方法送出唯一的任務給這個池,用來所有産品數列。在這種情況下,它是一個異步調用,而主線程繼續它的執行。

你已經使用forkjoinpool類的一些方法,用來檢查正在運作任務的狀态和變化。基于這個目的,這個類包括更多的方法。參見有這些方法完整清單的監控一個fork/join池指南。

你可以看出任務正在完成它們的工作和産品價格的更新。

不止這些…

forkjoinpool類提供其他的方法,用來執行一個任務。這些方法如下:

execute (runnable task):這是在這個示例中,使用的execute()方法的另一個版本。在這種情況下,你可以送出一個runnable對象給forkjoinpool類。注意:forkjoinpool類不會對runnable對象使用work-stealing算法。它(work-stealing算法)隻用于forkjointask對象。

invoke(forkjointask&lt;t&gt; task):當execute()方法使用一個異步調用forkjoinpool類,正如你在本示例中所學的,invoke()方法使用同步調用forkjoinpool類。這個調用不會(立即)傳回,直到傳遞的參數任務完成它的執行。

你也可以使用在executorservice接口的invokeall()和invokeany()方法。這些方法接收一個callable對象作為參數。forkjoinpool類不會對callable對象使用work-stealing算法,是以你最好使用執行者去執行它們。

forkjointask類同樣提供在示例中使用的invokeall()的其他版本。這些版本如下:

invokeall(forkjointask&lt;?&gt;… tasks):這個版本的方法使用一個可變參數清單。你可以傳入許多你想要執行的forkjointask對象作為參數。

invokeall(collection&lt;t&gt; tasks):這個版本的方法接收一個泛型類型t對象的集合(如:一個arraylist對象,一個linkedlist對象或者一個treeset對象)。這個泛型類型t必須是forkjointask類或它的子類。

即使forkjoinpool類被設計成用來執行一個forkjointask,你也可以直接執行runnable和callable對象。你也可以使用forkjointask類的adapt()方法來執行任務,它接收一個callable對象或runnable對象(作為參數)并傳回一個forkjointask對象。 參見

在第8章,測試并發應用程式中的監控一個fork/join池的指南