概要
現代的計算機已經向多CPU方向發展,即使是普通的PC,甚至現在的智能手機、多核處理器已被廣泛應用。在未來,處理器的核心數将會發展的越來越多。
雖然硬體上的多核CPU已經十分成熟,但是很多應用程式并未這種多核CPU做好準備,是以并不能很好地利用多核CPU的性能優勢。
為了充分利用多CPU、多核CPU的性能優勢,級軟基軟體系統應該可以充分“挖掘”每個CPU的計算能力,決不能讓某個CPU處于“空閑”狀态。為此,可以考慮把一個任務拆分成多個“小任務”,把多個"小任務"放到多個處理器核心上并行執行。當多個“小任務”執行完成之後,再将這些執行結果合并起來即可。
Java在JDK7之後加入了并行計算的架構Fork/Join,可以解決我們系統中大資料計算的性能問題。Fork/Join采用的是分治法,Fork是将一個大任務拆分成若幹個子任務,子任務分别去計算,而Join是擷取到子任務的計算結果,然後合并,這個是遞歸的過程。子任務被配置設定到不同的核上執行時,效率最高。僞代碼如下:
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
Fork/Join架構的核心類是ForkJoinPool,它能夠接收一個ForkJoinTask,并得到計算結果。ForkJoinTask有兩個子類,RecursiveTask(有傳回值)和RecursiveAction(無傳回結果),我們自己定義任務時,隻需選擇這兩個類繼承即可
示例代碼
package forkJoin;
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20;
private int[] array;
private int low;
private int high;
public SumTask(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
@Override
protected Integer compute() {
int sum = 0;
if (high - low + 1 <= THRESHOLD) {
System.out.println(low + " - " + high + " 計算");
// 測試并行的個數,統計輸出過程中的文字,看看有多少線程停止在這裡就知道有多少并行計算
// 參考 ForkJoinPool 初始化設定的并行數
// try {
// Thread.sleep(11111111);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// 小于門檻值則直接計算
for (int i = low; i <= high; i++) {
sum += array[i];
}
} else {
System.out.println(low + " - " + high + " 切分");
// 1. 一個大任務分割成兩個子任務
int mid = (low + high) / 2;
SumTask left = new SumTask(array, low, mid);
SumTask right = new SumTask(array, mid + 1, high);
// 2. 分别并行計算
invokeAll(left, right);
// 3. 合并結果
sum = left.join() + right.join();
// 另一種方式
try {
sum = left.get() + right.get();
} catch (Throwable e) {
System.out.println(e.getMessage());
}
}
return sum;
}
}
package forkJoin;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class Main {
/*static class MyTaskTest extends RecursiveTask<Integer> {
final int n;
MyTaskTest(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) return n;
MyTaskTest f1 = new MyTaskTest(n - 1);
f1.fork();
MyTaskTest f2 = new MyTaskTest(n - 2);
return f2.compute() + f1.join();
}
}*/
/*class SortTask extends RecursiveAction {
static final int THRESHOLD = 2;
final long[] array;
final int lo;
final int hi;
SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
protected void compute() {
if (hi - lo < THRESHOLD)
sequentiallySort(array, lo, hi);
else {
int mid = (lo + hi) >>> 1;
invokeAll(new SortTask(array, lo, mid),
new SortTask(array, mid, hi));
merge(array, lo, hi);
}
}
}*/
private static int[] genArray() {
int[] array = new int[100];
for (int i = 0; i < array.length; i++) {
array[i] = new Random().nextInt(500);
}
return array;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 下面以一個有傳回值的大任務為例,介紹一下RecursiveTask的用法。
大任務是:計算随機的100個數字的和。
小任務是:每次隻能20個數值的和。
*/
int[] array = genArray();
// System.out.println(Arrays.toString(array));
int total = 0;
for (int i = 0; i < array.length; i++) {
total += array[i];
}
System.out.println("目标和:" + total);
// 1. 建立任務
SumTask sumTask = new SumTask(array, 0, array.length - 1);
// 2. 建立線程池
// 設定并行計算的個數
int processors = Runtime.getRuntime().availableProcessors();
ForkJoinPool forkJoinPool = new ForkJoinPool(processors * 2);
// 3. 送出任務到線程池
forkJoinPool.submit(sumTask);
// forkJoinPool.shutdown();
long begin = System.currentTimeMillis();
// 4. 擷取結果
Integer result = sumTask.get();// wait for
long end = System.currentTimeMillis();
System.out.println(String.format("結果 %s ,耗時 %sms", result, end - begin));
if (result == total) {
System.out.println("測試成功");
} else {
System.out.println("fork join 使用失敗!!!!");
}
}
}
上面的代碼是一個100個整數累加的任務,切分到小于20個數的時候直接進行累加,不再切分。 我們通過調整門檻值(THRESHOLD),可以發現耗時是不一樣的。實際應用中,如果需要分割的任務大小是固定的,可以經過測試,得到最佳門檻值;如果大小不是固定的,就需要設計一個可伸縮的算法,來動态計算出門檻值。如果子任務很多,效率并不一定會高。 PS:類似的這種“分而治之”的需求場景,往往帶有遞歸性,實際中,我們可以考慮任務是否具有“遞歸性”來決定是否使用“Fork-Join”架構。