Fork Join架構
Fork/Join架構是Java 7提供的一個用于并行執行任務的架構,是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。
package com.example.xppdemo.chapter6;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 門檻值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任務足夠小就計算任務
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務大于門檻值,就分裂成兩個子任務計算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
// 等待子任務執行完,并得到其結果
int leftResult=leftTask.join();
int rightResult=rightTask.join();
// 合并子任務
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一個計算任務,負責計算1+2+3+4
CountTask task = new CountTask(1, 4);
// 執行一個任務
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
}
ForkJoinTask與一般任務的主要差別在于它需要實作compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執
行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看目前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行目前子任務并傳回結果。使用join方法會等待子任務執行完并得到其結果。
Fork/Join使用兩個類來完成以上兩件事情。
①ForkJoinTask:我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,隻需要繼承它的子類,Fork/Join架構提供了以下兩個子類。
- RecursiveAction:用于沒有傳回結果的任務。
- RecursiveTask:用于有傳回結果的任務。
②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。
任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當 一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。
Fork/Join架構的實作原理
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責将存放程式送出給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。
(1)ForkJoinTask的fork方法實作原理
當我們調用ForkJoinTask的fork方法時,程式會調用ForkJoinWorkerThread的pushTask方法異步地執行這個任務,然後立即傳回結果。代碼如下。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
workQueue.push:
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
pushTask方法把目前任務存放在ForkJoinTask數組隊列裡。然後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工作線程來執行任務。
(2)ForkJoinTask的join方法實作原理
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
- 如果任務狀态是已完成,則直接傳回任務結果。
- 如果任務狀态是被取消,則直接抛出CancellationException。
- 如果任務狀态是抛出異常,則直接抛出對應的異常。
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}