天天看點

ForkJoinPool的了解與使用

ForkJoinPool是一個強大的Java類,用于處理計算密集型任務。使用ForkJoinPool分解計算密集型任務并并行執行它們以獲得更好的Java應用程式性能。

它的工作原理是将任務分解為更小的子任務,然後并行執行它們。該線程池使用分而治之的政策運作,使其能夠并發執行任務,進而提高吞吐量并減少處理時間。

它的獨特功能之一ForkJoinPool是它用于優化性能的工作竊取算法。當工作線程完成配置設定給它的任務時,它會從其他線程竊取任務,確定所有線程高效工作,不浪費計算機資源。

ForkJoinPool在Java的并行流和CompletableFutures中廣泛使用,允許開發人員輕松并發執行任務。此外,Kotlin和Akka等其他JVM語言使用此架構來建構需要高并發性和彈性的消息驅動應用程式。

ForkJoinPool解析

ForkJoinPool類存儲workers,它們是機器上每個CPU核心上運作的程序。這些程序中的每一個都存儲在Deque的雙端隊列中。一旦工作線程用完任務,它就會開始從其它工作線程竊取任務。

首先,會有fork任務的過程,這意味着一個大任務将被分解成可以并行執行的小任務。所有子任務完成後,它們将重新加入。然後ForkJoinPool類提供一個結果。

ForkJoinPool的了解與使用

ForkJoinPool fork task

當一個任務被送出ForkJoinPool時,程序會被分成更小的程序并推送到一個共享隊列中。一旦調用fork()方法,就會并行調用任務,直到基本條件為真。一旦處理被分叉,join()方法確定線程互相等待,直到程序完成。

所有任務最初都會被送出到一個主隊列,這個主隊列将任務推送到工作線程。請注意:任務是使用與堆棧資料結構相同的LIFO(後進先出)政策插入的。

還有一點很重要的是ForkJoinPool使用Deques來存儲任務。這提供了會用LIFO(後進先出)或FIFO(先進先出)的能力,這是工作竊取算法所必須的。

ForkJoinPool的了解與使用

ForkJoinPool Deque

工作竊取算法

工作竊取是一種有效的算法,它通過在池中所有可用線程之間平衡工作負載來實作計算機資源的高效使用。

當一個線程變的空閑時,它不會保持不活動狀态,而是會嘗試從其它扔在忙于配置設定給它們的工作的線程中竊取任務。此過程最大限度地利用計算資源,并確定沒有線程負擔過重而其它線程保持空閑狀态。

工作竊取算法背後的關鍵概念是每個線程都有自己的雙端隊列任務,它以LIFO順序執行。

當一個線程完成自己的任務并變的空閑時,它會嘗試從另一個線程的雙端隊列任務的末尾“竊取”任務,遵循FIFO政策,與隊列資料結構相同。這使得空閑線程可以接手等待時間最長的任務,進而減少等待時間,提高吞吐量。

ForkJoinPool的了解與使用

ForkJoinPool 工作竊取算法

總體而言,ForkJoinPool的工作竊取算法是一個強大的功能,可以通過確定有效利用所有可用的計算資源來顯著提高并行程式的性能。

ForkJoinPool的主要類

  • ForkJoinPool,建立線程池使用ForkJoin架構,它的工作方式與其它線程池類似。此類中最重要的方法是commonPool()方法來建立ForkJoin線程池。
  • RecursiveAction,該類的主要功能是計算遞歸。在compute()方法是沒有傳回值的。
  • RecursiveTask,此類的工作方式和RecursiveAction類似,不同之處在于compute()方法是有傳回值的。

使用

RecursiveAction

使用RecursiveAction類,需要繼承它并覆寫compute()方法,然後,用我們想要實作的邏輯來建立子任務。

package com.joyce.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class RecursiveActionDemo {

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        int[] array = {2, 4, 6, 8, 10};
        DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length);
        forkJoinPool.invoke(doubleNumberTask);
        System.err.println(DoubleNumber.result);
    }
}

class DoubleNumber extends RecursiveAction {

    final int PROCESS_THRESHOLD = 2;
    int[] array;
    int beginIndex, endIndex;
    static int result;

    DoubleNumber(int[] array, int beginIndex, int endIndex) {
        this.array = array;
        this.beginIndex = beginIndex;
        this.endIndex = endIndex;
    }

    @Override
    protected void compute() {
        if (endIndex - beginIndex <= PROCESS_THRESHOLD) {
            for (int i = beginIndex; i < endIndex; i++) {
                result += array[i] * 2;
            }
        } else {
            int mid = (beginIndex + endIndex) / 2;
            DoubleNumber leftArray = new DoubleNumber(array, beginIndex, mid);
            DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex);
            // 遞歸調用計算方法
            leftArray.fork();
            rightArray.fork();
            // 加入遞歸結果
            leftArray.join();
            rightArray.join();
        }
    }
}           

如代碼所示,并行遞歸計算數組中每個數字的兩倍,并計算結果。

記住重要的一點,RecursiveAction沒有傳回值,通過使用分而治之的政策來分解流程提高性能。正如代碼所示,并非計算數組内每個元素的兩倍,而是通過将數組分成多個部分來并行執行操作。

同樣,需要注意的是,RecursiveAction用于可以有效分解為更小的子問題的任務時,非常有效。是以,RecursiveAction和ForkJoinPool用于計算密集型任務時,可以顯著提高性能。否則,由于線程的建立和管理,性能反而會變差。

RecursiveTask

RecursiveTask和RecursiveAction之間的差別在于,方法中是存在傳回值的。

package com.joyce.forkjoin;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class RecursiveTaskDemo extends RecursiveTask<Integer> {

    private final List<Integer> numbers;

    public RecursiveTaskDemo(List<Integer> numbers) {
        this.numbers = numbers;
    }

    @Override
    protected Integer compute() {
        if (numbers.size() <= 2) {
            return numbers.stream().mapToInt(e -> e).sum();
        } else {
            int mid = numbers.size() / 2;
            List<Integer> list1 = numbers.subList(0, mid);
            List<Integer> list2 = numbers.subList(mid, numbers.size());
            RecursiveTaskDemo task1 = new RecursiveTaskDemo(list1);
            RecursiveTaskDemo task2 = new RecursiveTaskDemo(list2);
            task1.fork();
            return task1.join() + task2.compute();
        }
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        List<Integer> numbers = Arrays.asList(1,3,5,7,9);
        int output = forkJoinPool.invoke(new RecursiveTaskDemo(numbers));
        System.err.println(output);
    }
}
           

此示例代碼,遞歸分解數組,直到達到基本條件。将list1和list2加入到RecursiveTask,然後分叉task1,利用compute()并行執行方法和數組的其它部分。當遞歸達到條件,join()方法加入結果。

何時使用ForkJoinPool

ForkJoinPool雖然很便利,但是不應在所有的情況下使用。正如前面所講,最好将它用于高密度計算的并發程序。

  • 遞歸任務:ForkJoinPool非常适合執行遞歸算法,例如快速排序、歸并排序或者二分查找。這些算法可以分解為更小的子問題并行執行,進而顯著提高性能。
  • 高并發場景:在高并發場景中,比如資料處理、Web伺服器等,可以使用跨線程并行執行ForkJoinPool任務,有助于提高性能和吞吐量。
  • 并行問題:如果你有一個可以很容易地分成獨立子任務的問題,比如說圖像處理或者數值模拟等,可以使用ForkJoinPool并行執行子任務。

總結

本文主要介紹了如何使用ForkJoinPool功能在CPU核心中執行繁重的操作。我們來總結一下:

  • ForkJoinPool是一個線程池,使用分而治之的政策來遞歸執行任務。
  • ForkJoinPool并行執行任務,進而有效利用計算機資源,提升性能。
  • 工作竊取算法通過允許空閑線程從繁忙線程竊取任務來優化資源使用率。
  • 任務存儲在雙端隊列中,存儲采用LIFO政策,竊取采用FIFO政策。
  • 架構中的主要類,ForkJoinPool、RecursiveAction、RecursiveTask:ForkJoinPool通常與并行流和CompletableFuture。RecursiveAction用于計算遞歸操作且沒有傳回值。RecursiveTask與RecursiveAction類似,但有傳回值。compute()方法在兩個類中都被重寫以實作自定義邏輯。fork()方法調用compute()方法并将任務分解為更小的子任務。join()方法等待子任務完成并合并結果。

繼續閱讀