天天看點

#yyds幹貨盤點#Fork Join架構

Fork Join架構

Fork/Join架構是Java 7提供的一個用于并行執行任務的架構,是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。

package com.example.xppdemo.chapter6;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2; // 門檻值
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大于門檻值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            // 執行子任務
            leftTask.fork();
            rightTask.fork();
            // 等待子任務執行完,并得到其結果
            int leftResult=leftTask.join();
            int rightResult=rightTask.join();
            // 合并子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 生成一個計算任務,負責計算1+2+3+4
        CountTask task = new CountTask(1, 4);
        // 執行一個任務
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
    }
}
           

ForkJoinTask與一般任務的主要差別在于它需要實作compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執

行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看目前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行目前子任務并傳回結果。使用join方法會等待子任務執行完并得到其結果。

Fork/Join使用兩個類來完成以上兩件事情。

①ForkJoinTask:我們要使用ForkJoin架構,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,隻需要繼承它的子類,Fork/Join架構提供了以下兩個子類。

  • RecursiveAction:用于沒有傳回結果的任務。
  • RecursiveTask:用于有傳回結果的任務。

②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。

任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當 一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務。

Fork/Join架構的實作原理

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責将存放程式送出給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。

(1)ForkJoinTask的fork方法實作原理

當我們調用ForkJoinTask的fork方法時,程式會調用ForkJoinWorkerThread的pushTask方法異步地執行這個任務,然後立即傳回結果。代碼如下。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}
           

workQueue.push:

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}
           

pushTask方法把目前任務存放在ForkJoinTask數組隊列裡。然後再調用ForkJoinPool的signalWork()方法喚醒或建立一個工作線程來執行任務。

(2)ForkJoinTask的join方法實作原理

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}
           
  • 如果任務狀态是已完成,則直接傳回任務結果。
  • 如果任務狀态是被取消,則直接抛出CancellationException。
  • 如果任務狀态是抛出異常,則直接抛出對應的異常。
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}