天天看點

【高并發】如何使用Java7中提供的Fork/Join架構實作高并發程式?

Fork/Join架構介紹

位于J.U.C(java.util.concurrent)中,是Java7中提供的用于執行并行任務的架構,其可以将大任務分割成若幹個小任務,最終彙總每個小任務的結果後得到最終結果。基本思想和Hadoop的MapReduce思想類似。

主要采用的是工作竊取算法(某個線程從其他隊列裡竊取任務來執行),并行分治計算中的一種Work-stealing政策

為什麼需要使用工作竊取算法呢?

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競争,于是把這些子任務分别放到不同的隊列裡,并為每個隊列建立一個單獨的線程來執行隊列裡的任務,線程和隊列一一對應,比如A線程負責處理A隊列裡的任務。但是有的線程會先把自己隊列裡的任務幹完,而其他線程對應的隊列裡還有任務等待處理。幹完活的線程與其等着,不如去幫其他線程幹活,于是它就去其他線程的隊列裡竊取一個任務來執行。而在這時它們會通路同一個隊列,是以為了減少竊取任務線程和被竊取任務線程之間的競争,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取算法的優點:

充分利用線程進行并行計算,并減少了線程間的競争

工作竊取算法的缺點:

在某些情況下還是存在競争,比如雙端隊列裡隻有一個任務時。并且該算法會消耗更多的系統資源,比如建立多個線程和多個雙端隊列。

Fork/Join架構局限性

對于Fork/Join架構而言,當一個任務正在等待它使用Join操作建立的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,并開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運作時間來提高應用程式的性能。為了實作這個目标,Fork/Join架構執行的任務有一些局限性,如下所示。

  • 任務隻能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join架構中,使任務進行了睡眠,那麼,在睡眠期間内,正在執行這個任務的工作線程将不會執行其他任務了。
  • 在Fork/Join架構中,所拆分的任務不應該去執行IO操作,比如:讀寫資料檔案。
  • 任務不能抛出檢查異常,必須通過必要的代碼來出來這些異常。

Fork/Join架構的核心類

Fork/Join架構的核心是兩個類:ForkJoinPool和ForkJoinTask。ForkJoinPool負責實作工作竊取算法、管理工作線程、提供關于任務的狀态以及執行資訊。ForkJoinTask主要提供在任務中執行Fork和Join操作的機制。

代碼示例

示例代碼如下:

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大于門檻值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
            // 執行子任務
            leftTask.fork();
            rightTask.fork();
            // 等待任務執行結束合并其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            // 合并子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();
        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}