1 fork-join架構的特點
它對問題的解決思路是分而治之,先将一個問題fork(分為)幾個子問題,然後子問題又分為孫子問題,直至細分為一個容易計算的問題,然後再将結果依次join(結合)為最終的答案.是不是感覺和雲計算中的Map-reduce計算模型很像?思路是一樣的,隻不過fork-join運作在一個JVM中的多個線程内,而map-reduce運作在分布式計算節點上
在運作線程時,它使用“work-steal”(任務偷取)算法.一般來說,fork-join會啟動多個線程(由參數指定,若不指定則預設為CPU核心數量),每個線程負責一個任務隊列,并依次從隊列頭部獲得任務并執行.當某個線程空閑時,它會從其他線程的任務隊列尾部偷取一個任務來執行,這樣就保證了線程的運作效率達到最高。
它面向的問題域是可以大量并行執行的計算任務,例如計算某個大型數組中每個元素的平方(當然這個有些無趣),其計算對象最好是一些獨立的元素,不會被其他線程通路,也沒有同步、互斥要求,更不要涉及IO或者無限循環.當然此架構也可以執行普通的并發程式設計任務,但是這時就失去了性能優勢
細分的計算任務有一個粗略的優化标準,即可以在100~10000條指令中執行完畢
了解以上思路後,來看看fork-join架構提供的幾個工具類:
ForkJoinPool:支援fork-join架構的線程池,所有ForkJoinTask任務都必須在其中運作,線程池主要使用invoke()、invokeAll()等方法來執行任務,當然也可以使用原有的execute()和submit()方法;
ForkJoinTask:支援fork-join架構的任務抽象類,它是Future接口,它代表一個支援fork()和join()方法的任務;
RecursiveAction:ForkJoinTask的兩個具體子類之一,代表沒有傳回值的ForkJoinTask任務;
RecursiveTask:ForkJoinTask的兩個具體子類之一,代表有傳回值的ForkJoinTask任務。
2 RecursiveAction
先來看一個使用RecursiveAction的例子,這段代碼的目的是計算一個大型數組中每個元素x的一個公式的值,這個公式是sin(x)+cos(x)+tan(x):
public class RecursiveActionExam {
private final static int NUMBER = 10000000;
public static void main(String[] args) {
double[] array = new double[NUMBER];
for (int i = 0; i < NUMBER; i++) {
array[i] = i;
}
long startTime = System.currentTimeMillis();
System.out.println(Runtime.getRuntime().availableProcessors());
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(new ComputeTask(array, 0, array.length));
long endTime = System.currentTimeMillis();
System.out.println("Time span = " + (endTime - startTime));
}
}
class ComputeTask extends RecursiveAction {
final double[] array;
final int lo, hi;
ComputeTask(double[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
protected void compute() {
if (hi - lo < 2) {
for (int i = lo; i < hi; ++i)
array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
} else {
int mid = (lo + hi) >>> 1;
invokeAll(new ComputeTask(array, lo, mid),
new ComputeTask(array, mid, hi));
}
}
}
再看看單線程的情況:
public class RecursiveSequenceExam {
private final static int NUMBER = 10000000;
public static void main(String[] args) {
double[] array = new double[NUMBER];
for (int i = 0; i < NUMBER; i++) {
array[i] = i;
}
long startTime = System.currentTimeMillis();
for (int i = 0; i < NUMBER; i++) {
array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
}
long endTime = System.currentTimeMillis();
System.out.println("Time span = " + (endTime - startTime));
}
}
運作結果是Time span = 12030。
由于我的CPU是4核的,再看看4線程的情況:
public class Recusive4ThreadExam {
private final static int NUMBER = 10000000;
public static void main(String[] args) throws InterruptedException {
double[] array = new double[NUMBER];
for (int i = 0; i < NUMBER; i++) {
array[i] = i;
}
long startTime = System.currentTimeMillis();
ExecutorService service = Executors.newFixedThreadPool(4);
service.execute(new ArrayTask(array, 0, NUMBER / 4));
service.execute(new ArrayTask(array, NUMBER / 4, NUMBER / 2));
service.execute(new ArrayTask(array, NUMBER / 2, NUMBER*3 / 4));
service.execute(new ArrayTask(array, NUMBER*3 / 4, NUMBER ));
service.shutdown();
service.awaitTermination(1,TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Time span = " + (endTime - startTime));
}
}
class ArrayTask implements Runnable {
final double[] array;
final int lo, hi;
ArrayTask(double[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
@Override
public void run() {
for (int i = lo; i < hi; ++i)
array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
}
}
運作結果是Time span = 4064。可以看出由于fork-join架構采用了任務偷取算法,比普通4線程快了一點點。
3 RecursiveTask
下面來看一個更有意義的場景,尋找一個大型數組的最小值,這裡我使用RecursiveTask(其實使用RecursiveAction也行,在它内部用一個成員變量儲存結果即可)。代碼如下:
public class RecursiveFindMax {
private static Random rand = new Random(47);
private static final int NUMBER = 1000000;
public static void main(String[] args) {
double[] array = new double[NUMBER];
for (int i = 0; i < NUMBER; i++) {
array[i] = rand.nextDouble();
}
ForkJoinPool pool = new ForkJoinPool();
TaskFindMax task = new TaskFindMax(0, array.length - 1, array);
pool.invoke(task);
System.out.println("MaxValue = " + task.join());
}
}
class TaskFindMax extends RecursiveTask<Double> {
private final int lo;
private final int hi;
private final double[] array;
//you can change THRESHOLD to get better efficiency
private final static int THRESHOLD = 10;
TaskFindMax(int lo, int hi, double[] array) {
this.lo = lo;
this.hi = hi;
this.array = array;
}
@Override
protected Double compute() {
if ((hi - lo) < THRESHOLD) {
double max = array[lo];
for (int i = lo; i < hi; i++) {
max = Math.max(max, array[i + 1]);
}
return max;
} else {
int mid = (lo + hi) >>> 1;
TaskFindMax lhs = new TaskFindMax(lo, mid, array);
TaskFindMax rhs = new TaskFindMax(mid, hi, array);
invokeAll(lhs, rhs);
return Math.max(lhs.join(), rhs.join());
}
}
}
pool.invoke(task)将一個最初的任務扔進了線程池執行,這個任務将會執行它的compute()方法。在此方法中,若滿足某個條件(例如數組上界和下界隻差小于門檻值THRESHOLD)則直接在這一段數組中查找最大值;若不滿足條件,則找出中值mid,然後new出兩個子任務lhs(left hand side)和rhs(right hand side),并調用invokeAll(lhs, rhs)将這兩個子任務都扔進線程池執行。任務的join()方法會得到傳回值,若任務尚未執行完畢則會在此阻塞。
通過這種程式設計模式,很好的将遞歸思想用到了多線程領域。值得注意的是,通過調整THRESHOLD可以增加或減少任務的個數,進而極大的影響線程的執行。在很多情況下,使用fork-join架構并不會比普通的多線程效率更高,甚至比單線程運作效率更低。是以,必須找到适合的場景,然後進行多次調優,才能獲得性能的改進。
小結
執行者與線程池的引入是因為Concurrency包的設計者想将線程的建立、執行和排程分離,進而使得使用者能夠更加專注于業務邏輯;Callable接口和Future接口使得異步執行結果的擷取更加簡單;ScheduledExecutorService取代Timer成為了線程重複和延遲執行的新标準;TimeUnit類的引入簡化了時間段的表達工作;包中提供的五種線程池可以極大的滿足程式員的各種需求,極端情況下還可以利用ThreadPoolExecutor類自己定制線程池。最後,從JDK1.7後引入的Fork-Join架構将“分而治之”的遞歸思想實作到線程池中,并應用“work-steal”算法實作了任務執行效率的提升