1. 什麼是Fork/Join架構 Fork/Join架構是Java7提供了的一個用于并行執行任務的架構, 是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。
我們再通過Fork和Join這兩個單詞來了解下Fork/Join架構,Fork就是把一個大任務切分為若幹子任務并行的執行,Join就是合并這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分别對1000個數進行求和,最終彙總這10個子任務的結果。Fork/Join的運作流程圖如下:
2. 工作竊取算法 工作竊取(work-stealing)算法是指某個線程從其他隊列裡竊取任務來執行。工作竊取的運作流程圖如下:
Java并發程式設計指南15:Fork/join并發架構與工作竊取算法剖析
與ThreadPool的差別
工作竊取算法 那麼為什麼需要使用工作竊取算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競争,于是把這些子任務分别放到不同的隊列裡,并為每個隊列建立一個單獨的線程來執行隊列裡的任務,線程和隊列一一對應,比如A線程負責處理A隊列裡的任務。但是有的線程會先把自己隊列裡的任務幹完,而其他線程對應的隊列裡還有任務等待處理。幹完活的線程與其等着,不如去幫其他線程幹活,于是它就去其他線程的隊列裡竊取一個任務來執行。而在這時它們會通路同一個隊列,是以為了減少竊取任務線程和被竊取任務線程之間的競争,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競争,其缺點是在某些情況下還是存在競争,比如雙端隊列裡隻有一個任務時。并且消耗了更多的系統資源,比如建立多個線程和多個雙端隊列。
3. Fork/Join架構的介紹 我們已經很清楚Fork/Join架構的需求了,那麼我們可以思考一下,如果讓我們來設計一個Fork/Join架構,該如何設計?這個思考有助于你了解Fork/Join架構的設計。
第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,是以還需要不停的分割,直到分割出的子任務足夠小。
第二步執行任務并合并結果。分割的子任務分别放在雙端隊列裡,然後幾個啟動線程分别從雙端隊列裡擷取任務執行。子任務執行完的結果都統一放在一個隊列裡,啟動一個線程從隊列裡拿資料,然後合并這些資料。
Fork/Join使用兩個類來完成以上兩件事情:
ForkJoinTask:我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而隻需要繼承它的子類,Fork/Join架構提供了以下兩個子類: RecursiveAction:用于沒有傳回結果的任務。 RecursiveTask :用于有傳回結果的任務。 ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。 4. 使用Fork/Join架構 讓我們通過一個簡單的需求來使用下Fork/Join架構,需求是:計算1+2+3+4的結果。
使用Fork/Join架構首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那麼我們設定分割的門檻值是2,由于是4個數字相加,是以Fork/Join架構會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務的結果。
因為是有結果的任務,是以必須繼承RecursiveTask,實作代碼如下:
003
importjava.util.concurrent.ExecutionException;
005
importjava.util.concurrent.ForkJoinPool;
007
importjava.util.concurrent.Future;
009
importjava.util.concurrent.RecursiveTask;
011
publicclassCountTaskextendsRecursiveTask {
013
privatestaticfinalintTHRESHOLD=
2
;
//門檻值
019
publicCountTask(intstart,intend) {
029
protectedInteger compute() {
035
booleancanCompute = (end-start) <=THRESHOLD;
039
for
(inti =start; i <=end; i++) {
047
//如果任務大于閥值,就分裂成兩個子任務計算
049
intmiddle = (start+end) /
2
;
051
CountTask leftTask =newCountTask(start, middle);
053
CountTask rightTask =newCountTask(middle +
1
,end);
063
intleftResult=leftTask.join();
065
intrightResult=rightTask.join();
069
sum = leftResult + rightResult;
077
publicstaticvoidmain(String[] args) {
079
ForkJoinPool forkJoinPool =newForkJoinPool();
081
//生成一個計算任務,負責計算1+2+3+4
083
CountTask task =newCountTask(
1
,
4
);
087
Future result = forkJoinPool.submit(task);
091
System.out.println(result.get());
093
}
catch
(InterruptedException e) {
095
}
catch
(ExecutionException e) {
通過這個例子讓我們再來進一步了解ForkJoinTask,ForkJoinTask與一般的任務的主要差別在于它需要實作compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看目前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行目前子任務并傳回結果。使用join方法會等待子任務執行完并得到其結果。
5. Fork/Join架構的異常處理 ForkJoinTask在執行的時候可能會抛出異常,但是我們沒辦法在主線程裡直接捕獲異常,是以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經抛出異常或已經被取消了,并且可以通過ForkJoinTask的getException方法擷取異常。使用如下代碼:
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法傳回Throwable對象,如果任務被取消了則傳回CancellationException。如果任務沒有完成或者沒有抛出異常則傳回null。
6. Fork/Join架構的實作原理 ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放程式送出給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。
ForkJoinTask的fork方法實作原理。當我們調用ForkJoinTask的fork方法時,程式會調用ForkJoinWorkerThread的pushTask方法異步的執行這個任務,然後立即傳回結果。代碼如下:
1
public
final
ForkJoinTask fork() {
2
((ForkJoinWorkerThread) Thread.currentThread())
pushTask方法把目前任務存放在ForkJoinTask 數組queue裡。然後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工作線程來執行任務。代碼如下:
01
final
void
pushTask(ForkJoinTask t) {
02
ForkJoinTask[] q;
int
s, m;
03
if
((q = queue) !=
null
) {
// ignore if queue removed
04
long
u = (((s = queueTop) & (m = q.length -
1
)) << ASHIFT) + ABASE;
05
UNSAFE.putOrderedObject(q, u, t);
06
queueTop = s +
1
;
// or use putOrderedInt
07
if
((s -= queueBase) <=
2
)
ForkJoinTask的join方法實作原理。Join方法的主要作用是阻塞目前線程并等待擷取結果。讓我們一起看看ForkJoinTask的join方法的實作,代碼如下:
01
public
final
V join() {
02
if
(doJoin() != NORMAL)
03
return
reportResult();
05
return
getRawResult();
07
private
V reportResult() {
09
if
((s = status) == CANCELLED)
10
throw
new
CancellationException();
11
if
(s == EXCEPTIONAL && (ex = getThrowableException()) !=
null
)
12
UNSAFE.throwException(ex);
13
return
getRawResult();
首先,它調用了doJoin()方法,通過doJoin()方法得到目前任務的狀态來判斷傳回什麼結果,任務狀态有四種:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出現異常(EXCEPTIONAL)。
如果任務狀态是已完成,則直接傳回任務結果。 如果任務狀态是被取消,則直接抛出CancellationException。 如果任務狀态是抛出異常,則直接抛出對應的異常。 讓我們再來分析下doJoin()方法的實作代碼:
01
private
int
doJoin() {
02
Thread t; ForkJoinWorkerThread w;
int
s;
boolean
completed;
03
if
((t = Thread.currentThread())
instanceof
ForkJoinWorkerThread) {
06
if
((w = (ForkJoinWorkerThread)t).unpushTask(
this
)) {
09
}
catch
(Throwable rex) {
10
return
setExceptionalCompletion(rex);
13
return
setCompletion(NORMAL);
15
return
w.joinTask(
this
);
18
return
externalAwaitDone();
在doJoin()方法裡,首先通過檢視任務的狀态,看任務是否已經執行完了,如果執行完了,則直接傳回任務狀态,如果沒有執行完,則從任務數組裡取出任務并執行。如果任務順利執行完成了,則設定任務狀态為NORMAL,如果出現異常,則紀錄異常,并将任務狀态設定為EXCEPTIONAL。
Fork/Join源碼剖析與算法解析 我們在大學算法課本上,學過的一種基本算法就是:分治。其基本思路就是:把一個大的任務分成若幹個子任務,這些子任務分别計算,最後再Merge出最終結果。這個過程通常都會用到遞歸。
而Fork/Join其實就是一種利用多線程來實作“分治算法”的并行架構。
另外一方面,可以把Fori/Join看作一個單機版的Map/Reduce,隻不過這裡的并行不是多台機器并行計算,而是多個線程并行計算。
下面看2個簡單例子:
例子1: 快排
我們都知道,快排有2個步驟:
第1步,拿數組的第1個元素,把元素劃分成2半,左邊的比該元素小,右邊的比該元素大;
第2步,對左右的2個子數組,分别排序。
可以看出,這裡左右2個子數組,可以互相獨立的,并行計算。是以可以利用ForkJoin架構, 代碼如下:
//定義一個Task,基礎自RecursiveAction,實作其compute方法
class SortTask extends RecursiveAction {
final long[] array;
final int lo;
final int hi;
private int THRESHOLD = 0; //For demo only
public SortTask(long[] array) {
this.array = array;
this.lo = 0;
this.hi = array.length - 1;
}
public SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
protected void compute() {
if (hi - lo < THRESHOLD)
sequentiallySort(array, lo, hi);
else {
int pivot = partition(array, lo, hi); //劃分
coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array,
pivot + 1, hi)); //遞歸調,左右2個子數組
}
}
private int partition(long[] array, int lo, int hi) {
long x = array[hi];
int i = lo - 1;
for (int j = lo; j < hi; j++) {
if (array[j] <= x) {
i++;
swap(array, i, j);
}
}
swap(array, i + 1, hi);
return i + 1;
}
private void swap(long[] array, int i, int j) {
if (i != j) {
long temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
private void sequentiallySort(long[] array, int lo, int hi) {
Arrays.sort(array, lo, hi + 1);
}
}
//測試函數
public void testSort() throws Exception {
ForkJoinTask sort = new SortTask(array); //1個任務
ForkJoinPool fjpool = new ForkJoinPool(); //1個ForkJoinPool
fjpool.submit(sort); //送出任務
fjpool.shutdown(); //結束。ForkJoinPool内部會開多個線程,并行上面的子任務
fjpool.awaitTermination(30, TimeUnit.SECONDS);
}
例子2: 求1到n個數的和
//定義一個Task,基礎自RecursiveTask,實作其commpute方法
public class SumTask extends RecursiveTask<Long>{
private static final int THRESHOLD = 10;
private long start;
private long end;
public SumTask(long n) {
this(1,n);
}
private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override //有傳回值
protected Long compute() {
long sum = 0;
if((end - start) <= THRESHOLD){
for(long l = start; l <= end; l++){
sum += l;
}
}else{
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid); //分治,遞歸
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
private static final long serialVersionUID = 1L;
}
//測試函數
public void testSum() throws Exception {
SumTask sum = new SumTask(100); //1個任務
ForkJoinPool fjpool = new ForkJoinPool(); //1個ForkJoinPool
Future<Long> future = fjpool.submit(sum); //送出任務
Long r = future.get(); //擷取傳回值
fjpool.shutdown();
}
與ThreadPool的差別 通過上面例子,我們可以看出,它在使用上,和ThreadPool有共同的地方,也有差別點:
(1) ThreadPool隻有“外部任務”,也就是調用者放到隊列裡的任務。 ForkJoinPool有“外部任務”,還有“内部任務”,也就是任務自身在執行過程中,分裂出”子任務“,遞歸,再次放入隊列。
(2)ForkJoinPool裡面的任務通常有2類,RecusiveAction/RecusiveTask,這2個都是繼承自FutureTask。在使用的時候,重寫其compute算法。
工作竊取算法 上面提到,ForkJoinPool裡有”外部任務“,也有“内部任務”。其中外部任務,是放在ForkJoinPool的全局隊列裡面,而每個Worker線程,也有一個自己的隊列,用于存放内部任務。
竊取的基本思路就是:當worker自己的任務隊列裡面沒有任務時,就去scan别的線程的隊列,把别人的任務拿過來執行。
//ForkJoinPool的成員變量
ForkJoinWorkerThread[] workers; //worker thread集合
private ForkJoinTask<?>[] submissionQueue; //外部任務隊列
private final ReentrantLock submissionLock;
//ForkJoinWorkerThread的成員變量
ForkJoinTask<?>[] queue; //每個worker線程自己的内部任務隊列
//送出任務
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
return task;
}
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) && //如果目前是worker線程送出的任務,也就是worker執行過程中,分裂出來的子任務,放入worker自己的内部任務隊列
(w = (ForkJoinWorkerThread)t).pool == this)
w.pushTask(task);
else
addSubmission(task); //外部任務,放入pool的全局隊列
}
//worker的run方法
public void run() {
Throwable exception = null;
try {
onStart();
pool.work(this);
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
}
}
final void work(ForkJoinWorkerThread w) {
boolean swept = false; // true on empty scans
long c;
while (!w.terminate && (int)(c = ctl) >= 0) {
int a; // active count
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a); //核心代碼都在這個scan函數裡面
else if (tryAwaitWork(w, c))
swept = false;
}
}
//scan的基本思路:從别人的任務隊列裡面搶,沒有,再到pool的全局的任務隊列裡面去取。
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard;
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // 過期檢測
return false;
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
//随機選出一個犧牲者(工作線程)。
ForkJoinWorkerThread v = ws[k & m];
//一系列檢查...
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
//如果這個犧牲者的任務隊列中還有任務,嘗試竊取這個任務。
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
//竊取成功後,調整queueBase
int d = (v.queueBase = b + 1) - v.queueTop;
//将犧牲者的stealHint設定為目前工作線程在pool中的下标。
v.stealHint = w.poolIndex;
if (d != 0)
signalWork(); // 如果犧牲者的任務隊列還有任務,繼續喚醒(或建立)線程。
w.execTask(t); //執行竊取的任務。
}
//計算出下一個随機種子。
r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
return false; // 傳回false,表示不是一個空掃描。
}
//前2*m次,随機掃描。
else if (j < 0) { // xorshift
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
//後2*m次,順序掃描。
else
++k;
}
if (scanGuard != g) // staleness check
return false;
else {
//如果掃描完畢後沒找到可竊取的任務,那麼從Pool的送出任務隊列中取一個任務來執行。
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
w.execTask(t);
}
return false;
}
return true; // 如果所有的隊列(工作線程的任務隊列和pool的任務隊列)都是空的,傳回true。
}
}
關于ForkJoinPool/FutureTask,本文隻是分析了其基本使用原理。還有很多實作細節,留待讀者自己去分析。
微信公衆号【Java技術江湖】一位阿裡 Java 工程師的技術小站。(關注公衆号後回複”Java“即可領取 Java基礎、進階、項目和架構師等免費學習資料,更有資料庫、分布式、微服務等熱門技術學習視訊,内容豐富,兼顧原理和實踐,另外也将贈送作者原創的Java學習指南、Java程式員面試指南等幹貨資源) Java并發程式設計指南15:Fork/join并發架構與工作竊取算法剖析
與ThreadPool的差別
工作竊取算法