天天看點

測試并發應用(四)監控Fork/Join池

executor 架構提供了線程的建立和管理,來實作任務的執行機制。java 7 包括了一個 executor 架構的延伸為了一種特别的問題而使用的,将比其他解決方案的表現有所提升(可以直接使用 thread 對象或者 executor 架構)。它是 fork/join 架構。

此架構是為了解決可以使用 divide 和 conquer 技術,使用 fork() 和 join() 操作把任務分成小塊的問題而設計的。主要的類實作這個行為的是 forkjoinpool 類。

在這個指南,你将學習從forkjoinpool類可以擷取的資訊和如何擷取這些資訊。

準備

指南中的例子是使用eclipse ide 來實作的。如果你使用eclipse 或者其他的ide,例如netbeans, 打開并建立一個新的java任務。

怎麼做呢…

按照這些步驟來實作下面的例子:

//1.   建立一個類,名為 task, 擴充 recursiveaction 類。

<code>01</code>

<code>public</code> <code>class</code> <code>task</code><code>extends</code> <code>recursiveaction {</code>

<code>02</code>

<code>03</code>

<code>    </code><code>// 2. 聲明一個私有 int array 屬性,名為 array,用來儲存你要增加的 array 的元素。</code>

<code>04</code>

<code>    </code><code>private</code> <code>int</code><code>[] array;</code>

<code>05</code>

<code>06</code>

<code>    </code><code>// 3. 聲明2個私有 int 屬性,名為 start 和 end,用來儲存 此任務已經處理的元素塊的頭和尾的位置。</code>

<code>07</code>

<code>    </code><code>private</code> <code>int</code> <code>start;</code>

<code>08</code>

<code>    </code><code>private</code> <code>int</code> <code>end;</code>

<code>09</code>

<code>10</code>

<code>    </code><code>// 4. 實作類的構造函數,初始化屬性值。</code>

<code>11</code>

<code>    </code><code>public</code> <code>task(</code><code>int</code> <code>array[],</code><code>int</code> <code>start,</code><code>int</code> <code>end) {</code>

<code>12</code>

<code>        </code><code>this</code><code>.array = array;</code>

<code>13</code>

<code>        </code><code>this</code><code>.start = start;</code>

<code>14</code>

<code>        </code><code>this</code><code>.end = end;</code>

<code>15</code>

<code>    </code><code>}</code>

<code>16</code>

<code>17</code>

<code>    </code><code>// 5. 用任務的中心邏輯來實作 compute()</code>

<code>18</code>

<code>    </code><code>// 方法。如果此任務已經處理了超過100任務,那麼把元素集分成2部分,再建立2個任務分别來執行這些部分,使用 fork() 方法開始執行,并使用</code>

<code>19</code>

<code>    </code><code>// join() 方法等待它的終結。</code>

<code>20</code>

<code>    </code><code>protected</code> <code>void</code> <code>compute() {</code>

<code>21</code>

<code>        </code><code>if</code> <code>(end - start &gt;</code><code>100</code><code>) {</code>

<code>22</code>

<code>            </code><code>int</code> <code>mid = (start + end) /</code><code>2</code><code>;</code>

<code>23</code>

<code>            </code><code>task task1 =</code><code>new</code> <code>task(array, start, mid);</code>

<code>24</code>

<code>            </code><code>task task2 =</code><code>new</code> <code>task(array, mid, end);</code>

<code>25</code>

<code>26</code>

<code>            </code><code>task1.fork();</code>

<code>27</code>

<code>            </code><code>task2.fork();</code>

<code>28</code>

<code>29</code>

<code>            </code><code>task1.join();</code>

<code>30</code>

<code>            </code><code>task2.join();</code>

<code>31</code>

<code>32</code>

<code>            </code><code>// 6. 如果任務已經處理了100個元素或者更少,那麼在每次操作之後讓線程進入休眠5毫秒來增加元素。</code>

<code>33</code>

<code>        </code><code>}</code><code>else</code> <code>{</code>

<code>34</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i = start; i &lt; end; i++) {</code>

<code>35</code>

<code>                </code><code>array[i]++;</code>

<code>36</code>

<code>37</code>

<code>                </code><code>try</code> <code>{</code>

<code>38</code>

<code>                    </code><code>thread.sleep(</code><code>5</code><code>);</code>

<code>39</code>

<code>                </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>40</code>

<code>                    </code><code>e.printstacktrace();</code>

<code>41</code>

<code>                </code><code>}</code>

<code>42</code>

<code>            </code><code>}</code>

<code>43</code>

<code>        </code><code>}</code>

<code>44</code>

<code>45</code>

<code>}</code>

//7. 建立例子的主類通過建立一個類,名為 main 并添加 main()方法。

<code>import</code> <code>java.util.concurrent.forkjoinpool;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>public</code> <code>class</code> <code>main {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>exception {</code>

<code>        </code><code>// 8. 建立 forkjoinpool 對象,名為 pool。</code>

<code>        </code><code>forkjoinpool pool =</code><code>new</code> <code>forkjoinpool();</code>

<code>        </code><code>// 9. 建立 10,000個元素的整數 array ,名為 array。</code>

<code>        </code><code>int</code> <code>array[] =</code><code>new</code> <code>int</code><code>[</code><code>10000</code><code>];</code>

<code>        </code><code>// 10. 建立新的 task 對象來處理整個array。</code>

<code>        </code><code>task task1 =</code><code>new</code> <code>task(array,</code><code>0</code><code>, array.length);</code>

<code>        </code><code>// 11. 使用 execute() 方法 把任務發送到pool裡執行。</code>

<code>        </code><code>pool.execute(task1);</code>

<code>        </code><code>// 12. 當任務還未結束執行,調用 showlog() 方法來把 forkjoinpool 類的狀态資訊寫入,然後讓線程休眠一秒。</code>

<code>        </code><code>while</code> <code>(!task1.isdone()) {</code>

<code>            </code><code>showlog(pool);</code>

<code>            </code><code>timeunit.seconds.sleep(</code><code>1</code><code>);</code>

<code>        </code><code>// 13. 使用 shutdown() 方法關閉pool。</code>

<code>        </code><code>pool.shutdown();</code>

<code>        </code><code>// 14. 使用 awaittermination() 方法 等待pool的終結。</code>

<code>        </code><code>pool.awaittermination(</code><code>1</code><code>, timeunit.days);</code>

<code>        </code><code>// 15. 調用 showlog() 方法寫關于 forkjoinpool 類狀态的資訊并寫一條資訊到操控台表明結束程式。</code>

<code>        </code><code>showlog(pool);</code>

<code>        </code><code>system.out.printf(</code><code>"main: end of the program.\n"</code><code>);</code>

<code>    </code><code>// 16. 實作 showlog() 方法。它接收 forkjoinpool 對象作為參數和寫關于線程和任務的執行的狀态的資訊。</code>

<code>    </code><code>private</code> <code>static</code> <code>void</code> <code>showlog(forkjoinpool pool) {</code>

<code>        </code><code>system.out.printf(</code><code>"**********************\n"</code><code>);</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool log\n"</code><code>);</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: parallelism:%d\n"</code><code>,</code>

<code>                </code><code>pool.getparallelism());</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: pool size:%d\n"</code><code>,</code>

<code>                </code><code>pool.getpoolsize());</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: active thread count:%d\n"</code><code>,</code>

<code>46</code>

<code>                </code><code>pool.getactivethreadcount());</code>

<code>47</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: running thread count:%d\n"</code><code>,</code>

<code>48</code>

<code>                </code><code>pool.getrunningthreadcount());</code>

<code>49</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: queued submission:%d\n"</code><code>,</code>

<code>50</code>

<code>                </code><code>pool.getqueuedsubmissioncount());</code>

<code>51</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: queued tasks:%d\n"</code><code>,</code>

<code>52</code>

<code>                </code><code>pool.getqueuedtaskcount());</code>

<code>53</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: queued submissions:%s\n"</code><code>,</code>

<code>54</code>

<code>                </code><code>pool.hasqueuedsubmissions());</code>

<code>55</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: steal count:%d\n"</code><code>,</code>

<code>56</code>

<code>                </code><code>pool.getstealcount());</code>

<code>57</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: terminated :%s\n"</code><code>,</code>

<code>58</code>

<code>                </code><code>pool.isterminated());</code>

<code>59</code>

<code>60</code>

<code>61</code>

它是如何工作的…

在這個指南,你實作了任務 使用 forkjoinpool 類和一個擴充 recursiveaction 類的 task 類來增加array的元素;它是 forkjoinpool 類執行的任務種類之一。當任務還在處理array時,你就把關于 forkjoinpool 類的狀态資訊列印到操控台。

你使用了forkjoinpool類中的以下方法:

getpoolsize(): 此方法傳回 int 值,它是forkjoinpool内部線程池的worker線程們的數量。

getparallelism(): 此方法傳回池的并行的級别。

getactivethreadcount(): 此方法傳回目前執行任務的線程的數量。

getrunningthreadcount():此方法傳回沒有被任何同步機制阻塞的正在工作的線程。

getqueuedsubmissioncount(): 此方法傳回已經送出給池還沒有開始他們的執行的任務數。

getqueuedtaskcount(): 此方法傳回已經送出給池已經開始他們的執行的任務數。

hasqueuedsubmissions(): 此方法傳回 boolean 值,表明這個池是否有queued任務還沒有開始他們的執行。

getstealcount(): 此方法傳回 long 值,worker 線程已經從另一個線程偷取到的時間數。

isterminated(): 此方法傳回 boolean 值,表明 fork/join 池是否已經完成執行。

繼續閱讀