天天看點

JUC系列(十) | Fork&Join架構 并行處理任務

多線程一直Java開發中的難點,也是面試中的常客,趁着還有時間,打算鞏固一下JUC方面知識,我想機會随處可見,但始終都是留給有準備的人的,希望我們都能加油!!!

沉下去,再浮上來

,我想我們會變的不一樣的。

🚤Fork&Join架構

1)介紹

Fork/Join架構是從Java1.7開始提供的一個并行處理任務的架構,它的基本思路是将一個大任務分解成若幹個小任務,并行處理多個小任務,最後再彙總合并這些小任務的結果便可得到原來的大任務結果。

JUC系列(十) | Fork&Join架構 并行處理任務

1、Fork :遞歸式的将大任務分割成合适大小的小任務。

2、Join:執行任務并合并結果。

2)相關類

我們要使用 Fork/Join 架構,首先需要建立一個 ForkJoin 任務。 ForkJoin 類提供了在任務中執行 fork 和 join 的機制。

通常情況下我們都是直接繼承ForkJoinTask 的子類,Fork/Join架構提供了兩個子類:

  • RecursiveAction:一個遞歸無結果的ForkJoinTask(沒有傳回值)任務
  • RecursiveTask:一個遞歸有結果的ForkJoinTask(有傳回值)任務
  • ForkJoinTask 主要方法:
    fork()   // 在目前線程運作的線程池中安排一個異步執行。簡單的了解就是再建立一個子任務。
    join()    //當任務完成的時候傳回計算結果。
    invoke()    //開始執行任務,如果必要,等待計算完成。
               

ForkJoinPool:另外ForkJoinTask需要通過 ForkJoinPool 來執行

JUC系列(十) | Fork&Join架構 并行處理任務

RecursiveTask:一個遞歸有結果的ForkJoinTask(有傳回值)任務

JUC系列(十) | Fork&Join架構 并行處理任務

🚁Fork 方法

調用fork方法時,程式會将新建立的子任務放入目前線程的workQueue隊列中,Fork/Join架構将根據目前正在并發執行的ForkJoinTask任務的ForkJoinWorkerThread線程狀态,來決定是讓這個任務在隊列中等待,還是建立一個新的ForkJoinWorkerThread線程運作它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運作它。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        //将給定的任務添加到送出者目前隊列的送出隊列中,如果為 null 或存在競争,則建立一個。
        ForkJoinPool.common.externalPush(this);
    return this;
}
           

push方法把目前任務存放在 ForkJoinTask 數組隊列裡。然後再調用 ForkJoinPool 的 signalWork()方法喚醒或建立一個工作線程來執行任務。代碼如下:

//推送任務。 僅由非共享隊列中的所有者調用。
final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a;
    int s = top, d, cap, m;
    ForkJoinPool p = pool;
    if ((a = array) != null && (cap = a.length) > 0) {
        QA.setRelease(a, (m = cap - 1) & s, task);
        top = s + 1;
        if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
            p != null) {                 // size 0 or 1
            VarHandle.fullFence();
            // signalWork方法的意義在于,如果運作的工作程式太少,則嘗試建立或釋放工作程式。
            p.signalWork(); 
        }

        // 如果array的剩餘空間不夠了,則進行增加
        else if (d == m)
            growArray(false); 
    }
}
           

🪂Join 方法

Join 方法的主要作用是阻塞目前線程并等待擷取結果。代碼如下:

通過 doJoin()方法得到目前任務的狀态來判斷傳回什麼結果,任務狀态有 4 種:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出現異常(EXCEPTIONAL)

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        //假如任務狀态是抛出異常狀态,就直接抛出對應的異常
        //若是任務狀态是被取消狀态,則直接抛出CancellationException異常。
        reportException(s);
    //如若任務狀态是已經完成,則直接立馬傳回任務結果。
    //即使此任務異常完成,如果不知道此任務已完成,則傳回null 
    return getRawResult();
}
           

讓我們分析一下 doJoin 方法的實作

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    //1. 首先通過檢視任務的狀态,看任務是否已經執行完成,如果執行完成,則直接傳回任務狀态;
	//2. 如果沒有執行完任務,則從任務數組裡取出任務并執行。
    //3. 如果任務順利執行完成,則設定任務狀态為 NORMAL,假如出現異常,則記錄異常,并将任務狀态設定為 EXCEPTIONAL。
    return (s = status) < 0 ?
        s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
    //幫助和/或阻塞,直到給定的任務完成或逾時
    wt.pool.awaitJoin(w, this, 0L) :
    externalAwaitDone();
}
           

🚤Fork/Join 架構的異常處理

ForkJoinTask 在執行的時候可能會抛出異常,但因為它并不是在主線程中運作,故此沒有辦法在主線程中去捕獲異常,這種問題當然 ForkJoinTask 也是提供了API來處理的啦,如下:

  1. //如果此任務引發異常或被取消,則傳回true 。 通常用來判斷任務情況
    public final boolean isCompletedAbnormally() {
        return (status & ABNORMAL) != 0;
    }
               
  2. //getException 方法傳回 Throwable 對象,如果任務被取消了則傳回CancellationException,如果任務沒有完成或者沒有抛出異常則傳回 null。
    public final Throwable getException() {
        int s = status;
        return ((s & ABNORMAL) == 0 ? null :
                (s & THROWN)   == 0 ? new CancellationException() :
                getThrowableException());
    }
               

🌍入門案例

場景: 生成一個計算任務,計算 1+2+3…+1000,每 100 個數切分一個 子任務

import java.util.concurrent.RecursiveTask;
/**
* 遞歸累加
*/
public class TaskExample extends RecursiveTask<Long> {
    private int start;
    private int end;
    private long sum;
    /**
	* 構造函數
	* @param start
	* @param end
	*/
    public TaskExample(int start, int end){
        this.start = start;
        this.end = end;
    }

   
    @Override
    protected Long compute() {
        System.out.println("任務" + start + "=========" + end + "累加開始");
        //大于 100 個數相加切分,小于直接加
        if(end - start <= 100){
            for (int i = start; i <= end; i++) {
                //累加
                sum += i;
            }
        }else {
            //切分為 2 塊
            int middle = start + 100;
            //遞歸調用,切分為 2 個小任務
            TaskExample taskExample1 = new TaskExample(start, middle);
            TaskExample taskExample2 = new TaskExample(middle + 1, end);
            //執行:異步
            taskExample1.fork();
            taskExample2.fork();
            //同步阻塞擷取執行結果
            sum = taskExample1.join() + taskExample2.join();
        }
        //加完傳回
        return sum;
    }
}
           
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
/**
* 分支合并案例
*/
public class ForkJoinPoolDemo {
    /**
 * 生成一個計算任務,計算 1+2+3.........+1000
 * @param args
 */
    public static void main(String[] args) {
        //定義任務
        TaskExample taskExample = new TaskExample(1, 1000);
        //定義執行對象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //加入任務執行
        ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
        //輸出結果
        try {
            System.out.println(result.get());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            forkJoinPool.shutdown();
        }
    }
}
           

🌈自言自語

最近又開始了JUC的學習,感覺Java内容真的很多,但是為了能夠走的更遠,還是覺得應該需要打牢一下基礎。

最近在持續更新中,如果你覺得對你有所幫助,也感興趣的話,關注我吧,讓我們

一起學習,一起讨論吧。

你好,我是部落客

甯在春

,Java學習路上的一顆小小的種子,也希望有一天能紮根長成蒼天大樹。

希望

與君共勉

😁

我們:待别時相見時,都已有所成。