天天看點

Java并發程式設計指南15:Fork/join并發架構與工作竊取算法剖析 與ThreadPool的差別 工作竊取算法

原創文章,轉載請注明: 轉載自 并發程式設計網 – ifeve.com

1. 什麼是Fork/Join架構

Fork/Join架構是Java7提供了的一個用于并行執行任務的架構, 是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。

我們再通過Fork和Join這兩個單詞來了解下Fork/Join架構,Fork就是把一個大任務切分為若幹子任務并行的執行,Join就是合并這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分别對1000個數進行求和,最終彙總這10個子任務的結果。Fork/Join的運作流程圖如下:

2. 工作竊取算法

工作竊取(work-stealing)算法是指某個線程從其他隊列裡竊取任務來執行。工作竊取的運作流程圖如下:

Java并發程式設計指南15:Fork/join并發架構與工作竊取算法剖析 與ThreadPool的差別 工作竊取算法

那麼為什麼需要使用工作竊取算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競争,于是把這些子任務分别放到不同的隊列裡,并為每個隊列建立一個單獨的線程來執行隊列裡的任務,線程和隊列一一對應,比如A線程負責處理A隊列裡的任務。但是有的線程會先把自己隊列裡的任務幹完,而其他線程對應的隊列裡還有任務等待處理。幹完活的線程與其等着,不如去幫其他線程幹活,于是它就去其他線程的隊列裡竊取一個任務來執行。而在這時它們會通路同一個隊列,是以為了減少竊取任務線程和被竊取任務線程之間的競争,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競争,其缺點是在某些情況下還是存在競争,比如雙端隊列裡隻有一個任務時。并且消耗了更多的系統資源,比如建立多個線程和多個雙端隊列。

3. Fork/Join架構的介紹

我們已經很清楚Fork/Join架構的需求了,那麼我們可以思考一下,如果讓我們來設計一個Fork/Join架構,該如何設計?這個思考有助于你了解Fork/Join架構的設計。

第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,是以還需要不停的分割,直到分割出的子任務足夠小。

第二步執行任務并合并結果。分割的子任務分别放在雙端隊列裡,然後幾個啟動線程分别從雙端隊列裡擷取任務執行。子任務執行完的結果都統一放在一個隊列裡,啟動一個線程從隊列裡拿資料,然後合并這些資料。

Fork/Join使用兩個類來完成以上兩件事情:

  • ForkJoinTask:我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而隻需要繼承它的子類,Fork/Join架構提供了以下兩個子類:
    • RecursiveAction:用于沒有傳回結果的任務。
    • RecursiveTask :用于有傳回結果的任務。
  • ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。

4. 使用Fork/Join架構

讓我們通過一個簡單的需求來使用下Fork/Join架構,需求是:計算1+2+3+4的結果。

使用Fork/Join架構首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那麼我們設定分割的門檻值是2,由于是4個數字相加,是以Fork/Join架構會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務的結果。

因為是有結果的任務,是以必須繼承RecursiveTask,實作代碼如下:

001

packagefj;

002

003

importjava.util.concurrent.ExecutionException;

004

005

importjava.util.concurrent.ForkJoinPool;

006

007

importjava.util.concurrent.Future;

008

009

importjava.util.concurrent.RecursiveTask;

010

011

publicclassCountTaskextendsRecursiveTask {

012

013

privatestaticfinalintTHRESHOLD= 

2

;

//門檻值

014

015

privateintstart;

016

017

privateintend;

018

019

publicCountTask(intstart,intend) {

020

021

this

.start= start;

022

023

this

.end= end;

024

025

}

026

027

@Override

028

029

protectedInteger compute() {

030

031

intsum = 

;

032

033

//如果任務足夠小就計算任務

034

035

booleancanCompute = (end-start) <=THRESHOLD;

036

037

if

(canCompute) {

038

039

for

(inti =start; i <=end; i++) {

040

041

sum += i;

042

043

}

044

045

}

else

{

046

047

//如果任務大于閥值,就分裂成兩個子任務計算

048

049

intmiddle = (start+end) / 

2

;

050

051

CountTask leftTask =newCountTask(start, middle);

052

053

CountTask rightTask =newCountTask(middle + 

1

,end);

054

055

//執行子任務

056

057

leftTask.fork();

058

059

rightTask.fork();

060

061

//等待子任務執行完,并得到其結果

062

063

intleftResult=leftTask.join();

064

065

intrightResult=rightTask.join();

066

067

//合并子任務

068

069

sum = leftResult  + rightResult;

070

071

}

072

073

returnsum;

074

075

}

076

077

publicstaticvoidmain(String[] args) {

078

079

ForkJoinPool forkJoinPool =newForkJoinPool();

080

081

//生成一個計算任務,負責計算1+2+3+4

082

083

CountTask task =newCountTask(

1

4

);

084

085

//執行一個任務

086

087

Future result = forkJoinPool.submit(task);

088

089

try

{

090

091

System.out.println(result.get());

092

093

}

catch

(InterruptedException e) {

094

095

}

catch

(ExecutionException e) {

096

097

}

098

099

}

100

101

}

通過這個例子讓我們再來進一步了解ForkJoinTask,ForkJoinTask與一般的任務的主要差別在于它需要實作compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看目前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行目前子任務并傳回結果。使用join方法會等待子任務執行完并得到其結果。

5. Fork/Join架構的異常處理

ForkJoinTask在執行的時候可能會抛出異常,但是我們沒辦法在主線程裡直接捕獲異常,是以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經抛出異常或已經被取消了,并且可以通過ForkJoinTask的getException方法擷取異常。使用如下代碼:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}      

getException方法傳回Throwable對象,如果任務被取消了則傳回CancellationException。如果任務沒有完成或者沒有抛出異常則傳回null。

6. Fork/Join架構的實作原理

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放程式送出給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。

ForkJoinTask的fork方法實作原理。當我們調用ForkJoinTask的fork方法時,程式會調用ForkJoinWorkerThread的pushTask方法異步的執行這個任務,然後立即傳回結果。代碼如下:

1

public

final

ForkJoinTask fork() {

2

((ForkJoinWorkerThread) Thread.currentThread())

3

.pushTask(

this

);

4

return

this

;

5

}

pushTask方法把目前任務存放在ForkJoinTask 數組queue裡。然後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工作線程來執行任務。代碼如下:

01

final

void

pushTask(ForkJoinTask t) {

02

ForkJoinTask[] q; 

int

s, m;

03

if

((q = queue) != 

null

) {    

// ignore if queue removed

04

long

u = (((s = queueTop) & (m = q.length - 

1

)) << ASHIFT) + ABASE;

05

UNSAFE.putOrderedObject(q, u, t);

06

queueTop = s + 

1

;         

// or use putOrderedInt

07

if

((s -= queueBase) <= 

2

)

08

pool.signalWork();

09

else

if

(s == m)

10

growQueue();

11

}

12

}

ForkJoinTask的join方法實作原理。Join方法的主要作用是阻塞目前線程并等待擷取結果。讓我們一起看看ForkJoinTask的join方法的實作,代碼如下:

01

public

final

V join() {

02

if

(doJoin() != NORMAL)

03

return

reportResult();

04

else

05

return

getRawResult();

06

}

07

private

V reportResult() {

08

int

s; Throwable ex;

09

if

((s = status) == CANCELLED)

10

throw

new

CancellationException();

11

if

(s == EXCEPTIONAL && (ex = getThrowableException()) != 

null

)

12

UNSAFE.throwException(ex);

13

return

getRawResult();

14

}

首先,它調用了doJoin()方法,通過doJoin()方法得到目前任務的狀态來判斷傳回什麼結果,任務狀态有四種:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出現異常(EXCEPTIONAL)。

  • 如果任務狀态是已完成,則直接傳回任務結果。
  • 如果任務狀态是被取消,則直接抛出CancellationException。
  • 如果任務狀态是抛出異常,則直接抛出對應的異常。

讓我們再來分析下doJoin()方法的實作代碼:

01

private

int

doJoin() {

02

Thread t; ForkJoinWorkerThread w; 

int

s; 

boolean

completed;

03

if

((t = Thread.currentThread()) 

instanceof

ForkJoinWorkerThread) {

04

if

((s = status) < 

)

05

return

s;

06

if

((w = (ForkJoinWorkerThread)t).unpushTask(

this

)) {

07

try

{

08

completed = exec();

09

catch

(Throwable rex) {

10

return

setExceptionalCompletion(rex);

11

}

12

if

(completed)

13

return

setCompletion(NORMAL);

14

}

15

return

w.joinTask(

this

);

16

}

17

else

18

return

externalAwaitDone();

19

}

在doJoin()方法裡,首先通過檢視任務的狀态,看任務是否已經執行完了,如果執行完了,則直接傳回任務狀态,如果沒有執行完,則從任務數組裡取出任務并執行。如果任務順利執行完成了,則設定任務狀态為NORMAL,如果出現異常,則紀錄異常,并将任務狀态設定為EXCEPTIONAL。

Fork/Join源碼剖析與算法解析

我們在大學算法課本上,學過的一種基本算法就是:分治。其基本思路就是:把一個大的任務分成若幹個子任務,這些子任務分别計算,最後再Merge出最終結果。這個過程通常都會用到遞歸。

而Fork/Join其實就是一種利用多線程來實作“分治算法”的并行架構。

另外一方面,可以把Fori/Join看作一個單機版的Map/Reduce,隻不過這裡的并行不是多台機器并行計算,而是多個線程并行計算。

下面看2個簡單例子:

例子1: 快排 

我們都知道,快排有2個步驟: 

第1步,拿數組的第1個元素,把元素劃分成2半,左邊的比該元素小,右邊的比該元素大; 

第2步,對左右的2個子數組,分别排序。

可以看出,這裡左右2個子數組,可以互相獨立的,并行計算。是以可以利用ForkJoin架構, 代碼如下:

//定義一個Task,基礎自RecursiveAction,實作其compute方法
class SortTask extends RecursiveAction {
    final long[] array;
    final int lo;
    final int hi;
    private int THRESHOLD = 0; //For demo only

    public SortTask(long[] array) {
        this.array = array;
        this.lo = 0;
        this.hi = array.length - 1;
    }

    public SortTask(long[] array, int lo, int hi) {
        this.array = array;
        this.lo = lo;
        this.hi = hi;
    }

    protected void compute() {
        if (hi - lo < THRESHOLD)
            sequentiallySort(array, lo, hi);
        else {
            int pivot = partition(array, lo, hi);  //劃分
            coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array,
                    pivot + 1, hi));  //遞歸調,左右2個子數組
        }
    }

    private int partition(long[] array, int lo, int hi) {
        long x = array[hi];
        int i = lo - 1;
        for (int j = lo; j < hi; j++) {
            if (array[j] <= x) {
                i++;
                swap(array, i, j);
            }
        }
        swap(array, i + 1, hi);
        return i + 1;
    }

    private void swap(long[] array, int i, int j) {
        if (i != j) {
            long temp = array[i];
            array[i] = array[j];
            array[j] = temp;
        }
    }

    private void sequentiallySort(long[] array, int lo, int hi) {
        Arrays.sort(array, lo, hi + 1);
    }
}


//測試函數
    public void testSort() throws Exception {
        ForkJoinTask sort = new SortTask(array);   //1個任務
        ForkJoinPool fjpool = new ForkJoinPool();  //1個ForkJoinPool
        fjpool.submit(sort); //送出任務
        fjpool.shutdown(); //結束。ForkJoinPool内部會開多個線程,并行上面的子任務

        fjpool.awaitTermination(30, TimeUnit.SECONDS);
    }           

例子2: 求1到n個數的和

//定義一個Task,基礎自RecursiveTask,實作其commpute方法
public class SumTask extends RecursiveTask<Long>{  
    private static final int THRESHOLD = 10;  

    private long start;  
    private long end;  

    public SumTask(long n) {  
        this(1,n);  
    }  

    private SumTask(long start, long end) {  
        this.start = start;  
        this.end = end;  
    }  

    @Override  //有傳回值
    protected Long compute() {  
        long sum = 0;  
        if((end - start) <= THRESHOLD){  
            for(long l = start; l <= end; l++){  
                sum += l;  
            }  
        }else{  
            long mid = (start + end) >>> 1;  
            SumTask left = new SumTask(start, mid);   //分治,遞歸
            SumTask right = new SumTask(mid + 1, end);  
            left.fork();  
            right.fork();  
            sum = left.join() + right.join();  
        }  
        return sum;  
    }  
    private static final long serialVersionUID = 1L;  
}  

//測試函數
    public void testSum() throws Exception {
        SumTask sum = new SumTask(100);   //1個任務
        ForkJoinPool fjpool = new ForkJoinPool();  //1個ForkJoinPool
        Future<Long> future = fjpool.submit(sum); //送出任務
        Long r = future.get(); //擷取傳回值
        fjpool.shutdown(); 

    }           

與ThreadPool的差別

通過上面例子,我們可以看出,它在使用上,和ThreadPool有共同的地方,也有差別點: 

(1) ThreadPool隻有“外部任務”,也就是調用者放到隊列裡的任務。 ForkJoinPool有“外部任務”,還有“内部任務”,也就是任務自身在執行過程中,分裂出”子任務“,遞歸,再次放入隊列。 

(2)ForkJoinPool裡面的任務通常有2類,RecusiveAction/RecusiveTask,這2個都是繼承自FutureTask。在使用的時候,重寫其compute算法。

工作竊取算法

上面提到,ForkJoinPool裡有”外部任務“,也有“内部任務”。其中外部任務,是放在ForkJoinPool的全局隊列裡面,而每個Worker線程,也有一個自己的隊列,用于存放内部任務。

竊取的基本思路就是:當worker自己的任務隊列裡面沒有任務時,就去scan别的線程的隊列,把别人的任務拿過來執行。

//ForkJoinPool的成員變量
ForkJoinWorkerThread[] workers;  //worker thread集合
private ForkJoinTask<?>[] submissionQueue; //外部任務隊列
private final ReentrantLock submissionLock; 

//ForkJoinWorkerThread的成員變量
ForkJoinTask<?>[] queue;   //每個worker線程自己的内部任務隊列

//送出任務
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {  
    if (task == null)  
        throw new NullPointerException();  
    forkOrSubmit(task);  
    return task;  
} 

private <T> void forkOrSubmit(ForkJoinTask<T> task) {  
    ForkJoinWorkerThread w;  
    Thread t = Thread.currentThread();  
    if (shutdown)  
        throw new RejectedExecutionException();  
    if ((t instanceof ForkJoinWorkerThread) &&   //如果目前是worker線程送出的任務,也就是worker執行過程中,分裂出來的子任務,放入worker自己的内部任務隊列
        (w = (ForkJoinWorkerThread)t).pool == this)  
        w.pushTask(task);  
    else  
        addSubmission(task);  //外部任務,放入pool的全局隊列
}   

//worker的run方法
public void run() {  
    Throwable exception = null;  
    try {  
        onStart();  
        pool.work(this);  
    } catch (Throwable ex) {  
        exception = ex;  
    } finally {  
        onTermination(exception);  
    }  
}  

final void work(ForkJoinWorkerThread w) {  
    boolean swept = false;                // true on empty scans  
    long c;  
    while (!w.terminate && (int)(c = ctl) >= 0) {  
        int a;                            // active count  
        if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)  
            swept = scan(w, a);   //核心代碼都在這個scan函數裡面
        else if (tryAwaitWork(w, c))  
            swept = false;  
    }  
}  

//scan的基本思路:從别人的任務隊列裡面搶,沒有,再到pool的全局的任務隊列裡面去取。
private boolean scan(ForkJoinWorkerThread w, int a) {  
    int g = scanGuard;   

    int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;  
    ForkJoinWorkerThread[] ws = workers;  
    if (ws == null || ws.length <= m)         // 過期檢測  
        return false;  

    for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {  
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;  
        //随機選出一個犧牲者(工作線程)。  
        ForkJoinWorkerThread v = ws[k & m];  
        //一系列檢查...  
        if (v != null && (b = v.queueBase) != v.queueTop &&  
            (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {  
            //如果這個犧牲者的任務隊列中還有任務,嘗試竊取這個任務。  
            long u = (i << ASHIFT) + ABASE;  
            if ((t = q[i]) != null && v.queueBase == b &&  
                UNSAFE.compareAndSwapObject(q, u, t, null)) {  
                //竊取成功後,調整queueBase  
                int d = (v.queueBase = b + 1) - v.queueTop;  
                //将犧牲者的stealHint設定為目前工作線程在pool中的下标。  
                v.stealHint = w.poolIndex;  
                if (d != 0)  
                    signalWork();             // 如果犧牲者的任務隊列還有任務,繼續喚醒(或建立)線程。  
                w.execTask(t); //執行竊取的任務。  
            }  
            //計算出下一個随機種子。  
            r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);  
            return false;                     // 傳回false,表示不是一個空掃描。  
        }  
        //前2*m次,随機掃描。  
        else if (j < 0) {                     // xorshift  
            r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;  
        }  
        //後2*m次,順序掃描。  
        else  
            ++k;  
    }  
    if (scanGuard != g)                       // staleness check  
        return false;  
    else {                                     
        //如果掃描完畢後沒找到可竊取的任務,那麼從Pool的送出任務隊列中取一個任務來執行。  
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;  
        if ((b = queueBase) != queueTop &&  
            (q = submissionQueue) != null &&  
            (i = (q.length - 1) & b) >= 0) {  
            long u = (i << ASHIFT) + ABASE;  
            if ((t = q[i]) != null && queueBase == b &&  
                UNSAFE.compareAndSwapObject(q, u, t, null)) {  
                queueBase = b + 1;  
                w.execTask(t);  
            }  
            return false;  
        }  
        return true;                         // 如果所有的隊列(工作線程的任務隊列和pool的任務隊列)都是空的,傳回true。  
    }  
}             

關于ForkJoinPool/FutureTask,本文隻是分析了其基本使用原理。還有很多實作細節,留待讀者自己去分析。

微信公衆号【Java技術江湖】一位阿裡 Java 工程師的技術小站。(關注公衆号後回複”Java“即可領取 Java基礎、進階、項目和架構師等免費學習資料,更有資料庫、分布式、微服務等熱門技術學習視訊,内容豐富,兼顧原理和實踐,另外也将贈送作者原創的Java學習指南、Java程式員面試指南等幹貨資源)
Java并發程式設計指南15:Fork/join并發架構與工作竊取算法剖析 與ThreadPool的差別 工作竊取算法