天天看點

【高并發】什麼是ForkJoin?看這一篇就夠了!

寫在前面

在JDK中,提供了這樣一種功能:它能夠将複雜的邏輯拆分成一個個簡單的邏輯來并行執行,待每個并行執行的邏輯執行完成後,再将各個結果進行彙總,得出最終的結果資料。有點像Hadoop中的MapReduce。

ForkJoin是由JDK1.7之後提供的多線程并發處理架構。ForkJoin架構的基本思想是分而治之。什麼是分而治之?分而治之就是将一個複雜的計算,按照設定的門檻值分解成多個計算,然後将各個計算結果進行彙總。相應的,ForkJoin将複雜的計算當做一個任務,而分解的多個計算則是當做一個個子任務來并行執行。

Java并發程式設計的發展

對于Java語言來說,生來就支援多線程并發程式設計,在并發程式設計領域也是在不斷發展的。Java在其發展過程中對并發程式設計的支援越來越完善也正好印證了這一點。

  • Java 1 支援thread,synchronized。
  • Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
  • Java 7 加入了fork-join庫。
  • Java 8 加入了 parallel streams。

并發與并行

并發和并行在本質上還是有所差別的。

并發

并發指的是在同一時刻,隻有一個線程能夠擷取到CPU執行任務,而多個線程被快速的輪換執行,這就使得在宏觀上具有多個線程同時執行的效果,并發不是真正的同時執行,并發可以使用下圖表示。

【高并發】什麼是ForkJoin?看這一篇就夠了!
并行

并行指的是無論何時,多個線程都是在多個CPU核心上同時執行的,是真正的同時執行。

【高并發】什麼是ForkJoin?看這一篇就夠了!

分治法

基本思想

把一個規模大的問題劃分為規模較小的子問題,然後分而治之,最後合并子問題的解得到原問題的解。

步驟

①分割原問題;

②求解子問題;

③合并子問題的解為原問題的解。

我們可以使用如下僞代碼來表示這個步驟。

if(任務很小){
    直接計算得到結果
}else{
    分拆成N個子任務
    調用子任務的fork()進行計算
    調用子任務的join()合并計算結果
}      

在分治法中,子問題一般是互相獨立的,是以,經常通過遞歸調用算法來求解子問題。

典型應用

  • 二分搜尋
  • 大整數乘法
  • Strassen矩陣乘法
  • 棋盤覆寫
  • 合并排序
  • 快速排序
  • 線性時間選擇
  • 漢諾塔

ForkJoin并行處理架構

ForkJoin架構概述

Java 1.7 引入了一種新的并發架構—— Fork/Join Framework,主要用于實作“分而治之”的算法,特别是分治之後遞歸調用的函數。

ForkJoin架構的本質是一個用于并行執行任務的架構, 能夠把一個大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務的計算結果。在Java中,ForkJoin架構與ThreadPool共存,并不是要替換ThreadPool

其實,在Java 8中引入的并行流計算,内部就是采用的ForkJoinPool來實作的。例如,下面使用并行流實作列印數組元組的程式。

public class SumArray {
    public static void main(String[] args){
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        numberList.parallelStream().forEach(System.out::println);
    }
}      

這段代碼的背後就使用到了ForkJoinPool。

說到這裡,可能有讀者會問:可以使用線程池的ThreadPoolExecutor來實作啊?為什麼要使用ForkJoinPool啊?ForkJoinPool是個什麼鬼啊?! 接下來,我們就來回答這個問題。

ForkJoin架構原理

ForkJoin架構是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實作了Executor和ExecutorService接口。它使用了一個無限隊列來儲存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入指定的線程數量,那麼目前計算機可用的CPU數量會被設定為線程數量作為預設值。

ForkJoinPool主要使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序算法。這裡的要點在于,ForkJoinPool能夠使用相對較少的線程來處理大量的任務。比如要對1000萬個資料進行排序,那麼會将這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合并任務。以此類推,對于500萬的資料也會做出同樣的分割處理,到最後會設定一個門檻值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小于10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概200萬+個。問題的關鍵在于,對于一個任務而言,隻有當它所有的子任務完成之後,它才能夠被執行。

是以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向任務隊列中再添加一個任務并在等待該任務完成之後再繼續執行。而使用ForkJoinPool就能夠解決這個問題,它就能夠讓其中的線程建立新的任務,并挂起目前的任務,此時線程就能夠從隊列中選擇子任務執行。

那麼使用ThreadPoolExecutor或者ForkJoinPool,性能上會有什麼差異呢?

首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關系的任務時,也需要200萬個線程,很顯然這是不可行的,也是很不合理的!!

工作竊取算法

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競争,于是把這些子任務分别放到不同的隊列裡,并為每個隊列建立一個單獨的線程來執行隊列裡的任務,線程和隊列一一對應,比如A線程負責處理A隊列裡的任務。但是有的線程會先把自己隊列裡的任務幹完,而其他線程對應的隊列裡還有任務等待處理。幹完活的線程與其等着,不如去幫其他線程幹活,于是它就去其他線程的隊列裡竊取一個任務來執行。而在這時它們會通路同一個隊列,是以為了減少竊取任務線程和被竊取任務線程之間的競争,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取算法的優點:

充分利用線程進行并行計算,并減少了線程間的競争。

工作竊取算法的缺點:

在某些情況下還是存在競争,比如雙端隊列裡隻有一個任務時。并且該算法會消耗更多的系統資源,比如建立多個線程和多個雙端隊列。

Fork/Join架構局限性:

對于Fork/Join架構而言,當一個任務正在等待它使用Join操作建立的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,并開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運作時間來提高應用程式的性能。為了實作這個目标,Fork/Join架構執行的任務有一些局限性。

(1)任務隻能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join架構中,使任務進行了睡眠,那麼,在睡眠期間内,正在執行這個任務的工作線程将不會執行其他任務了。

(2)在Fork/Join架構中,所拆分的任務不應該去執行IO操作,比如:讀寫資料檔案。

(3)任務不能抛出檢查異常,必須通過必要的代碼來出來這些異常。

ForkJoin架構的實作

ForkJoin架構中一些重要的類如下所示。

【高并發】什麼是ForkJoin?看這一篇就夠了!

ForkJoinPool 架構中涉及的主要類如下所示。

1.ForkJoinPool類

實作了ForkJoin架構中的線程池,由類圖可以看出,ForkJoinPool類實作了線程池的Executor接口。

我們也可以從下圖中看出ForkJoinPool的類圖關系。

【高并發】什麼是ForkJoin?看這一篇就夠了!

其中,可以使用Executors.newWorkStealPool()方法建立ForkJoinPool。

ForkJoinPool中提供了如下送出任務的方法。

public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)      

2.ForkJoinWorkerThread類

實作ForkJoin架構中的線程。

3.ForkJoinTask類

ForkJoinTask封裝了資料及其相應的計算,并且支援細粒度的資料并行。ForkJoinTask比線程要輕量,ForkJoinPool中少量工作線程能夠運作大量的ForkJoinTask。

ForkJoinTask類中主要包括兩個方法fork()和join(),分别實作任務的分拆與合并。

fork()方法類似于Thread.start(),但是它并不立即執行任務,而是将任務放入工作隊列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不簡單的阻塞線程,而是利用工作線程運作其他任務,當一個工作線程中調用join(),它将處理其他任務,直到注意到目标子任務已經完成。

我們可以使用下圖來表示這個過程。

【高并發】什麼是ForkJoin?看這一篇就夠了!

ForkJoinTask有3個子類:

【高并發】什麼是ForkJoin?看這一篇就夠了!
  • RecursiveAction:無傳回值的任務。
  • RecursiveTask:有傳回值的任務。
  • CountedCompleter:完成任務後将觸發其他任務。

4.RecursiveTask類

有傳回結果的ForkJoinTask實作Callable。

5.RecursiveAction類

無傳回結果的ForkJoinTask實作Runnable。

6.CountedCompleter類

在任務完成執行後會觸發執行一個自定義的鈎子函數。

ForkJoin示例程式

package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大于門檻值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
            // 執行子任務
            leftTask.fork();
            rightTask.fork();
            // 等待任務執行結束合并其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            // 合并子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();
        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}