分支/合并架構詳解
分支架構的目的是以遞歸的方式将可以并行的任務拆分成更小的任務,然後将每個子任務的結果合并起來生成整體結果.
它是 ExecutorService 接口的一個實作,他把子任務配置設定給線程池(ForkJoinPool)中的線程.
使用 RecursiveTask
要把任務送出到池,必須建立 RecursiveTask 的一個子類,其中V是并行化任務産生的結果類型,
RecursiveTask類源碼:
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
* @return
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}
要定義 RecursiveTask, 隻需實作它唯一的抽象方法compute :
@Override
protected Long compute() {
return null;
}
這個方法定義了将任務拆分成子任務的邏輯,以及無法再拆分或不便再拆分,生成單個子任務結果的邏輯.
即(僞代碼如下):
if (任務足夠小或不可分) {
順序計算該任務
} else
遞歸的任務拆分過程如圖:
如果你了解著名的分治算法,會發現這不過是分支算法的并行版本而已.
接下來我們舉一個用分支/合并架構的實際例子,還以前面的例子為基礎,讓我們試着用這個架構為一個數字範圍(這裡用一個long[] 數組表示)求和
/**
* 分支合并架構測試
*
* @author itguang
* @create
public class ForkJoinTest extends RecursiveTask<Long> {
//要處理的任務數組
private final long[] numbers;
//子任務處理數組的起始和結束位置
private final int start;
private final int end;
//閥值,當數組小于10000就并行執行
public static final long THRESHOLD = 10000;
//公共構造函數,用于建立子任務
//私有構造函數,用于 以遞歸方式為主任務建立子任務
public ForkJoinTest(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
public ForkJoinTest(long[] numbers) {
this(numbers, 0, numbers.length);
}
@Override
protected Long compute() {
int length = end - start;
//如果大小小于等于閥值,則順序計算結果
if (length <= THRESHOLD) {
return computeSequentially();
}
//否則,建立一個子任務為數組的前一半求和
ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
//利用另一個 ForkJoinPool 裡的線程異步執行新建立的子任務.
leftTask.fork();
//建立一個任務為數組的後一半求和
ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
//**遞歸操作**
long rightResult = rightTask.compute();
//遇到遞歸終止條件,讀取本次遞歸第一個子任務的結果,如果尚未完成就等待
long leftResult = leftTask.join();
//遞歸累加
return leftResult+rightResult;
}
//計算和
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return
測試:建立一個 ForkJoinPool,并把任務傳遞給它的invoke()方法.在ForkPool中執行時,傳回結果就是ForkJoinTest的并行遞歸求和結果
@Test
public void test9(){
long[] numbers = LongStream.rangeClosed(1, 1000*10000).toArray();
ForkJoinTest forkJoinTest = new ForkJoinTest(numbers);
Long sum = new ForkJoinPool().invoke(forkJoinTest);
System.out.println(sum);//50000005000000
請注意在實際應用時,使用多個 ForkJoinPool 是沒有什麼意義的。正是出于這個原因,一
般來說把它執行個體化一次,然後把執行個體儲存在靜态字段中,使之成為單例,這樣就可以在軟體中任
何部分友善地重用了。這裡建立時用了其預設的無參數構造函數,這意味着想讓線程池使用JVM
能夠使用的所有處理器。更确切地說,該構造函數将使用 Runtime.availableProcessors 的
傳回值來決定線程池使用的線程數。請注意 availableProcessors 方法雖然看起來是處理器,
但它實際上傳回的是可用核心的數量,包括超線程生成的虛拟核心。
當把一個ForkJoinTask 任務交給ForkJoinPool時,這個任務就由池中的一個線程執行,這個線程會調用任務的 compute 方法.
該方法會檢查任務是否小到足以順序執行,如果不夠小則會把要求和的數組分成兩半,分給兩個新的 ForkJoinTest ,而它們也由ForkJoinPool 安排執行.
是以這一過程可以遞歸重複,把原任務拆分成更小的任務執行,知道滿足不可炒粉的條件,在上例中是拆分數組的大小小于閥值.
這時候會從遞歸終止開始順序計算每個任務的結果.然後由分支建立的二叉樹周遊回它的根.接下來會合并每個子任務的部分結果,進而得到總任務的結果.
如圖:
ForkJoinTask工作竊取算法
在 ForkJoinSumCalculator 的例子中,我們決定在要求和的數組中最多包含10 000個項目
時就不再建立子任務了。這個選擇是很随意的,但大多數情況下也很難找到一個好的啟發式方法
來确定它,隻能試幾個不同的值來嘗試優化它。在我們的測試案例中,我們先用了一個有1000
萬項目的數組,意味着 ForkJoinSumCalculator 至少會分出1000個子任務來。這似乎有點浪費
資源,因為我們用來運作它的機器上隻有四個核心。在這個特定例子中可能确實是這樣,因為所
有的任務都受CPU限制,預計所花的時間也差不多。
但分出大量的小任務一般來說都是一個好的選擇。這是因為,理想情況下,劃分并行任務時,
應該讓每個任務都用完全相同的時間完成,讓所有的CPU核心都同樣繁忙。不幸的是,實際中,每
個子任務所花的時間可能天差地别,要麼是因為劃分政策效率低,要麼是有不可預知的原因,比如
磁盤通路慢,或是需要和外部服務協調執行。
分支/合并架構工程用一種稱為工作竊取(work stealing)的技術來解決這個問題。在實際應
用中,這意味着這些任務差不多被平均配置設定到 ForkJoinPool 中的所有線程上。每個線程都為分
配給它的任務儲存一個雙向鍊式隊列,每完成一個任務,就會從隊列頭上取出下一個任務開始執
行。基于前面所述的原因,某個線程可能早早完成了配置設定給它的所有任務,也就是它的隊列已經
空了,而其他的線程還很忙。這時,這個線程并沒有閑下來,而是随機選了一個别的線程,從隊
列的尾巴上“偷走”一個任務。這個過程一直繼續下去,直到所有的任務都執行完畢,所有的隊
列都清空。這就是為什麼要劃成許多小任務而不是少數幾個大任務,這有助于更好地在工作線程
之間平衡負載。