大家好,我是冰河~~
今天跟大家聊聊如何使用Java7提供的Fork/Join架構實作高并發程式。好了,開始今天的主題吧!
Fork/Join架構
位于J.U.C(java.util.concurrent)中,是Java7中提供的用于執行并行任務的架構,其可以将大任務分割成若幹個小任務,最終彙總每個小任務的結果後得到最終結果。基本思想和Hadoop的MapReduce思想類似。
主要采用的是工作竊取算法(某個線程從其他隊列裡竊取任務來執行),并行分治計算中的一種Work-stealing政策
為什麼需要使用工作竊取算法呢?
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競争,于是把這些子任務分别放到不同的隊列裡,并為每個隊列建立一個單獨的線程來執行隊列裡的任務,線程和隊列一一對應,比如A線程負責處理A隊列裡的任務。但是有的線程會先把自己隊列裡的任務幹完,而其他線程對應的隊列裡還有任務等待處理。幹完活的線程與其等着,不如去幫其他線程幹活,于是它就去其他線程的隊列裡竊取一個任務來執行。而在這時它們會通路同一個隊列,是以為了減少竊取任務線程和被竊取任務線程之間的競争,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工作竊取算法的優點
充分利用線程進行并行計算,并減少了線程間的競争
工作竊取算法的缺點
在某些情況下還是存在競争,比如雙端隊列裡隻有一個任務時。并且該算法會消耗更多的系統資源,比如建立多個線程和多個雙端隊列。
Fork/Join架構局限性
對于Fork/Join架構而言,當一個任務正在等待它使用Join操作建立的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,并開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運作時間來提高應用程式的性能。為了實作這個目标,Fork/Join架構執行的任務有一些局限性,如下所示。
- 任務隻能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join架構中,使任務進行了睡眠,那麼,在睡眠期間内,正在執行這個任務的工作線程将不會執行其他任務了。
- 在Fork/Join架構中,所拆分的任務不應該去執行IO操作,比如:讀寫資料檔案
- 任務不能抛出檢查異常,必須通過必要的代碼來出來這些異常
Fork/Join架構的核心類
Fork/Join架構的核心是兩個類:ForkJoinPool和ForkJoinTask。ForkJoinPool負責實作工作竊取算法、管理工作線程、提供關于任務的狀态以及執行資訊。ForkJoinTask主要提供在任務中執行Fork和Join操作的機制。
示例代碼
示例代碼如下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任務足夠小就計算任務
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務大于門檻值,就分裂成兩個子任務計算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
// 等待任務執行結束合并其結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任務
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個計算任務,計算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//執行一個任務
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
推薦閱讀
《
實踐出真知:全網最強秒殺系統架構解密,不是所有的秒殺都是秒殺!!》
注意:線程的執行順序與你想的并不一樣!! 深入解析Callable接口 兩種異步模型與深度解析Future接口! 解密SimpleDateFormat類的線程安全問題和六種解決方案! 不得不說的線程池與ThreadPoolExecutor類淺析 深度解析線程池中那些重要的頂層接口和抽象類 從源碼角度分析建立線程池究竟有哪些方式 通過源碼深度解析ThreadPoolExecutor類是如何保證線程池正确運作的 通過ThreadPoolExecutor類的源碼深度解析線程池執行任務的核心流程 通過源碼深度分析線程池中Worker線程的執行流程 從源碼角度深度解析線程池是如何實作優雅退出的 ScheduledThreadPoolExecutor與Timer的差別和簡單示例 深度解析ScheduledThreadPoolExecutor類的源代碼 由InterruptedException異常引發的思考 淺談AQS中的CountDownLatch、Semaphore與CyclicBarrier 淺談AQS中的ReentrantLock、ReentrantReadWriteLock、StampedLock與Condition 朋友去面試竟然栽在了Thread類的源碼上好了,今天就到這兒吧,我是冰河,我們下期見~~