package com.test.thread.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
/**
* Created by jl on 2018/8/31 0031
* fork join架構采用分治法将任務分解進而提高整體任務執行效率
* forkJoinTask.fork()方法用于将任務繼續拆分或執行,如果是拆分,目前線程就充當了監工無法被配置設定到任務
* forkJoinTask.invokeAll(task1,task2,...)方法是針對fork方法的優化,被invoke的n個任務中會将第一個任務留給目前線程
* 去執行,這樣遞歸循環下去進而保證所有的線程都充當勞工角色(有些線程既是勞工也是監工,沒有隻是監工的線程)
* forkJoinTask.join()方法用于傳回任務執行的結果
*/
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
ForkJoinPool forkJoinPool = new ForkJoinPool(5);
CountTask task = new CountTask(1, 6);
Future<Integer> result = forkJoinPool.submit(task);
System.out.println(result.get());
}
}
package com.test.thread.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* Created by jl on 2018/8/31 0031
* 用于計算連續自然數相加的任務
*/
public class CountTask extends RecursiveTask<Integer> {
//門檻值,任務大小低于該值則結束拆分
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
System.out.println(Thread.currentThread().getName() + "(勞工)");
// 模拟工作耗時
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//進行運算
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
System.out.println(Thread.currentThread().getName() + "(監工)");
//如果大于門檻值,就進行任務拆分
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執行子任務
// leftTask.fork();
// rightTask.fork();
// invokeAll的N個任務中,其中N-1個任務會使用fork()交給其它線程執行,
// 但是,它還會留一個任務自己執行,這樣,就充分利用了線程池,保證沒有空閑的線程。
invokeAll(leftTask, rightTask);
//等待子任務執行完成,得到執行結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任務結果
sum = leftResult + rightResult;
}
return sum;
}
}