多線程一直Java開發中的難點,也是面試中的常客,趁着還有時間,打算鞏固一下JUC方面知識,我想機會随處可見,但始終都是留給有準備的人的,希望我們都能加油!!! 沉下去,再浮上來
,我想我們會變的不一樣的。
🚤Fork&Join架構
1)介紹
Fork/Join架構是從Java1.7開始提供的一個并行處理任務的架構,它的基本思路是将一個大任務分解成若幹個小任務,并行處理多個小任務,最後再彙總合并這些小任務的結果便可得到原來的大任務結果。
1、Fork :遞歸式的将大任務分割成合适大小的小任務。
2、Join:執行任務并合并結果。
2)相關類
我們要使用 Fork/Join 架構,首先需要建立一個 ForkJoin 任務。 ForkJoin 類提供了在任務中執行 fork 和 join 的機制。
通常情況下我們都是直接繼承ForkJoinTask 的子類,Fork/Join架構提供了兩個子類:
- RecursiveAction:一個遞歸無結果的ForkJoinTask(沒有傳回值)任務
- RecursiveTask:一個遞歸有結果的ForkJoinTask(有傳回值)任務
- ForkJoinTask 主要方法:
fork() // 在目前線程運作的線程池中安排一個異步執行。簡單的了解就是再建立一個子任務。 join() //當任務完成的時候傳回計算結果。 invoke() //開始執行任務,如果必要,等待計算完成。
ForkJoinPool:另外ForkJoinTask需要通過 ForkJoinPool 來執行
RecursiveTask:一個遞歸有結果的ForkJoinTask(有傳回值)任務
🚁Fork 方法
調用fork方法時,程式會将新建立的子任務放入目前線程的workQueue隊列中,Fork/Join架構将根據目前正在并發執行的ForkJoinTask任務的ForkJoinWorkerThread線程狀态,來決定是讓這個任務在隊列中等待,還是建立一個新的ForkJoinWorkerThread線程運作它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運作它。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//将給定的任務添加到送出者目前隊列的送出隊列中,如果為 null 或存在競争,則建立一個。
ForkJoinPool.common.externalPush(this);
return this;
}
push方法把目前任務存放在 ForkJoinTask 數組隊列裡。然後再調用 ForkJoinPool 的 signalWork()方法喚醒或建立一個工作線程來執行任務。代碼如下:
//推送任務。 僅由非共享隊列中的所有者調用。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
int s = top, d, cap, m;
ForkJoinPool p = pool;
if ((a = array) != null && (cap = a.length) > 0) {
QA.setRelease(a, (m = cap - 1) & s, task);
top = s + 1;
if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
p != null) { // size 0 or 1
VarHandle.fullFence();
// signalWork方法的意義在于,如果運作的工作程式太少,則嘗試建立或釋放工作程式。
p.signalWork();
}
// 如果array的剩餘空間不夠了,則進行增加
else if (d == m)
growArray(false);
}
}
🪂Join 方法
Join 方法的主要作用是阻塞目前線程并等待擷取結果。代碼如下:
通過 doJoin()方法得到目前任務的狀态來判斷傳回什麼結果,任務狀态有 4 種:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出現異常(EXCEPTIONAL)
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
//假如任務狀态是抛出異常狀态,就直接抛出對應的異常
//若是任務狀态是被取消狀态,則直接抛出CancellationException異常。
reportException(s);
//如若任務狀态是已經完成,則直接立馬傳回任務結果。
//即使此任務異常完成,如果不知道此任務已完成,則傳回null
return getRawResult();
}
讓我們分析一下 doJoin 方法的實作
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//1. 首先通過檢視任務的狀态,看任務是否已經執行完成,如果執行完成,則直接傳回任務狀态;
//2. 如果沒有執行完任務,則從任務數組裡取出任務并執行。
//3. 如果任務順利執行完成,則設定任務狀态為 NORMAL,假如出現異常,則記錄異常,并将任務狀态設定為 EXCEPTIONAL。
return (s = status) < 0 ?
s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
//幫助和/或阻塞,直到給定的任務完成或逾時
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
🚤Fork/Join 架構的異常處理
ForkJoinTask 在執行的時候可能會抛出異常,但因為它并不是在主線程中運作,故此沒有辦法在主線程中去捕獲異常,這種問題當然 ForkJoinTask 也是提供了API來處理的啦,如下:
-
//如果此任務引發異常或被取消,則傳回true 。 通常用來判斷任務情況 public final boolean isCompletedAbnormally() { return (status & ABNORMAL) != 0; }
-
//getException 方法傳回 Throwable 對象,如果任務被取消了則傳回CancellationException,如果任務沒有完成或者沒有抛出異常則傳回 null。 public final Throwable getException() { int s = status; return ((s & ABNORMAL) == 0 ? null : (s & THROWN) == 0 ? new CancellationException() : getThrowableException()); }
🌍入門案例
場景: 生成一個計算任務,計算 1+2+3…+1000,每 100 個數切分一個 子任務
import java.util.concurrent.RecursiveTask;
/**
* 遞歸累加
*/
public class TaskExample extends RecursiveTask<Long> {
private int start;
private int end;
private long sum;
/**
* 構造函數
* @param start
* @param end
*/
public TaskExample(int start, int end){
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
System.out.println("任務" + start + "=========" + end + "累加開始");
//大于 100 個數相加切分,小于直接加
if(end - start <= 100){
for (int i = start; i <= end; i++) {
//累加
sum += i;
}
}else {
//切分為 2 塊
int middle = start + 100;
//遞歸調用,切分為 2 個小任務
TaskExample taskExample1 = new TaskExample(start, middle);
TaskExample taskExample2 = new TaskExample(middle + 1, end);
//執行:異步
taskExample1.fork();
taskExample2.fork();
//同步阻塞擷取執行結果
sum = taskExample1.join() + taskExample2.join();
}
//加完傳回
return sum;
}
}
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
/**
* 分支合并案例
*/
public class ForkJoinPoolDemo {
/**
* 生成一個計算任務,計算 1+2+3.........+1000
* @param args
*/
public static void main(String[] args) {
//定義任務
TaskExample taskExample = new TaskExample(1, 1000);
//定義執行對象
ForkJoinPool forkJoinPool = new ForkJoinPool();
//加入任務執行
ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
//輸出結果
try {
System.out.println(result.get());
}catch (Exception e){
e.printStackTrace();
}finally {
forkJoinPool.shutdown();
}
}
}
🌈自言自語
最近又開始了JUC的學習,感覺Java内容真的很多,但是為了能夠走的更遠,還是覺得應該需要打牢一下基礎。
最近在持續更新中,如果你覺得對你有所幫助,也感興趣的話,關注我吧,讓我們
一起學習,一起讨論吧。
你好,我是部落客
甯在春
,Java學習路上的一顆小小的種子,也希望有一天能紮根長成蒼天大樹。
希望
與君共勉
😁
我們:待别時相見時,都已有所成。