天天看点

测试并发应用(四)监控Fork/Join池

executor 框架提供了线程的创建和管理,来实现任务的执行机制。java 7 包括了一个 executor 框架的延伸为了一种特别的问题而使用的,将比其他解决方案的表现有所提升(可以直接使用 thread 对象或者 executor 框架)。它是 fork/join 框架。

此框架是为了解决可以使用 divide 和 conquer 技术,使用 fork() 和 join() 操作把任务分成小块的问题而设计的。主要的类实现这个行为的是 forkjoinpool 类。

在这个指南,你将学习从forkjoinpool类可以获取的信息和如何获取这些信息。

准备

指南中的例子是使用eclipse ide 来实现的。如果你使用eclipse 或者其他的ide,例如netbeans, 打开并创建一个新的java任务。

怎么做呢…

按照这些步骤来实现下面的例子:

//1.   创建一个类,名为 task, 扩展 recursiveaction 类。

<code>01</code>

<code>public</code> <code>class</code> <code>task</code><code>extends</code> <code>recursiveaction {</code>

<code>02</code>

<code>03</code>

<code>    </code><code>// 2. 声明一个私有 int array 属性,名为 array,用来储存你要增加的 array 的元素。</code>

<code>04</code>

<code>    </code><code>private</code> <code>int</code><code>[] array;</code>

<code>05</code>

<code>06</code>

<code>    </code><code>// 3. 声明2个私有 int 属性,名为 start 和 end,用来储存 此任务已经处理的元素块的头和尾的位置。</code>

<code>07</code>

<code>    </code><code>private</code> <code>int</code> <code>start;</code>

<code>08</code>

<code>    </code><code>private</code> <code>int</code> <code>end;</code>

<code>09</code>

<code>10</code>

<code>    </code><code>// 4. 实现类的构造函数,初始化属性值。</code>

<code>11</code>

<code>    </code><code>public</code> <code>task(</code><code>int</code> <code>array[],</code><code>int</code> <code>start,</code><code>int</code> <code>end) {</code>

<code>12</code>

<code>        </code><code>this</code><code>.array = array;</code>

<code>13</code>

<code>        </code><code>this</code><code>.start = start;</code>

<code>14</code>

<code>        </code><code>this</code><code>.end = end;</code>

<code>15</code>

<code>    </code><code>}</code>

<code>16</code>

<code>17</code>

<code>    </code><code>// 5. 用任务的中心逻辑来实现 compute()</code>

<code>18</code>

<code>    </code><code>// 方法。如果此任务已经处理了超过100任务,那么把元素集分成2部分,再创建2个任务分别来执行这些部分,使用 fork() 方法开始执行,并使用</code>

<code>19</code>

<code>    </code><code>// join() 方法等待它的终结。</code>

<code>20</code>

<code>    </code><code>protected</code> <code>void</code> <code>compute() {</code>

<code>21</code>

<code>        </code><code>if</code> <code>(end - start &gt;</code><code>100</code><code>) {</code>

<code>22</code>

<code>            </code><code>int</code> <code>mid = (start + end) /</code><code>2</code><code>;</code>

<code>23</code>

<code>            </code><code>task task1 =</code><code>new</code> <code>task(array, start, mid);</code>

<code>24</code>

<code>            </code><code>task task2 =</code><code>new</code> <code>task(array, mid, end);</code>

<code>25</code>

<code>26</code>

<code>            </code><code>task1.fork();</code>

<code>27</code>

<code>            </code><code>task2.fork();</code>

<code>28</code>

<code>29</code>

<code>            </code><code>task1.join();</code>

<code>30</code>

<code>            </code><code>task2.join();</code>

<code>31</code>

<code>32</code>

<code>            </code><code>// 6. 如果任务已经处理了100个元素或者更少,那么在每次操作之后让线程进入休眠5毫秒来增加元素。</code>

<code>33</code>

<code>        </code><code>}</code><code>else</code> <code>{</code>

<code>34</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i = start; i &lt; end; i++) {</code>

<code>35</code>

<code>                </code><code>array[i]++;</code>

<code>36</code>

<code>37</code>

<code>                </code><code>try</code> <code>{</code>

<code>38</code>

<code>                    </code><code>thread.sleep(</code><code>5</code><code>);</code>

<code>39</code>

<code>                </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>40</code>

<code>                    </code><code>e.printstacktrace();</code>

<code>41</code>

<code>                </code><code>}</code>

<code>42</code>

<code>            </code><code>}</code>

<code>43</code>

<code>        </code><code>}</code>

<code>44</code>

<code>45</code>

<code>}</code>

//7. 创建例子的主类通过创建一个类,名为 main 并添加 main()方法。

<code>import</code> <code>java.util.concurrent.forkjoinpool;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>public</code> <code>class</code> <code>main {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>exception {</code>

<code>        </code><code>// 8. 创建 forkjoinpool 对象,名为 pool。</code>

<code>        </code><code>forkjoinpool pool =</code><code>new</code> <code>forkjoinpool();</code>

<code>        </code><code>// 9. 创建 10,000个元素的整数 array ,名为 array。</code>

<code>        </code><code>int</code> <code>array[] =</code><code>new</code> <code>int</code><code>[</code><code>10000</code><code>];</code>

<code>        </code><code>// 10. 创建新的 task 对象来处理整个array。</code>

<code>        </code><code>task task1 =</code><code>new</code> <code>task(array,</code><code>0</code><code>, array.length);</code>

<code>        </code><code>// 11. 使用 execute() 方法 把任务发送到pool里执行。</code>

<code>        </code><code>pool.execute(task1);</code>

<code>        </code><code>// 12. 当任务还未结束执行,调用 showlog() 方法来把 forkjoinpool 类的状态信息写入,然后让线程休眠一秒。</code>

<code>        </code><code>while</code> <code>(!task1.isdone()) {</code>

<code>            </code><code>showlog(pool);</code>

<code>            </code><code>timeunit.seconds.sleep(</code><code>1</code><code>);</code>

<code>        </code><code>// 13. 使用 shutdown() 方法关闭pool。</code>

<code>        </code><code>pool.shutdown();</code>

<code>        </code><code>// 14. 使用 awaittermination() 方法 等待pool的终结。</code>

<code>        </code><code>pool.awaittermination(</code><code>1</code><code>, timeunit.days);</code>

<code>        </code><code>// 15. 调用 showlog() 方法写关于 forkjoinpool 类状态的信息并写一条信息到操控台表明结束程序。</code>

<code>        </code><code>showlog(pool);</code>

<code>        </code><code>system.out.printf(</code><code>"main: end of the program.\n"</code><code>);</code>

<code>    </code><code>// 16. 实现 showlog() 方法。它接收 forkjoinpool 对象作为参数和写关于线程和任务的执行的状态的信息。</code>

<code>    </code><code>private</code> <code>static</code> <code>void</code> <code>showlog(forkjoinpool pool) {</code>

<code>        </code><code>system.out.printf(</code><code>"**********************\n"</code><code>);</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool log\n"</code><code>);</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: parallelism:%d\n"</code><code>,</code>

<code>                </code><code>pool.getparallelism());</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: pool size:%d\n"</code><code>,</code>

<code>                </code><code>pool.getpoolsize());</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: active thread count:%d\n"</code><code>,</code>

<code>46</code>

<code>                </code><code>pool.getactivethreadcount());</code>

<code>47</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: running thread count:%d\n"</code><code>,</code>

<code>48</code>

<code>                </code><code>pool.getrunningthreadcount());</code>

<code>49</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: queued submission:%d\n"</code><code>,</code>

<code>50</code>

<code>                </code><code>pool.getqueuedsubmissioncount());</code>

<code>51</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: queued tasks:%d\n"</code><code>,</code>

<code>52</code>

<code>                </code><code>pool.getqueuedtaskcount());</code>

<code>53</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: queued submissions:%s\n"</code><code>,</code>

<code>54</code>

<code>                </code><code>pool.hasqueuedsubmissions());</code>

<code>55</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: steal count:%d\n"</code><code>,</code>

<code>56</code>

<code>                </code><code>pool.getstealcount());</code>

<code>57</code>

<code>        </code><code>system.out.printf(</code><code>"main: fork/join pool: terminated :%s\n"</code><code>,</code>

<code>58</code>

<code>                </code><code>pool.isterminated());</code>

<code>59</code>

<code>60</code>

<code>61</code>

它是如何工作的…

在这个指南,你实现了任务 使用 forkjoinpool 类和一个扩展 recursiveaction 类的 task 类来增加array的元素;它是 forkjoinpool 类执行的任务种类之一。当任务还在处理array时,你就把关于 forkjoinpool 类的状态信息打印到操控台。

你使用了forkjoinpool类中的以下方法:

getpoolsize(): 此方法返回 int 值,它是forkjoinpool内部线程池的worker线程们的数量。

getparallelism(): 此方法返回池的并行的级别。

getactivethreadcount(): 此方法返回当前执行任务的线程的数量。

getrunningthreadcount():此方法返回没有被任何同步机制阻塞的正在工作的线程。

getqueuedsubmissioncount(): 此方法返回已经提交给池还没有开始他们的执行的任务数。

getqueuedtaskcount(): 此方法返回已经提交给池已经开始他们的执行的任务数。

hasqueuedsubmissions(): 此方法返回 boolean 值,表明这个池是否有queued任务还没有开始他们的执行。

getstealcount(): 此方法返回 long 值,worker 线程已经从另一个线程偷取到的时间数。

isterminated(): 此方法返回 boolean 值,表明 fork/join 池是否已经完成执行。

继续阅读