CompletableFuture
完成異步任務回調其他任務
可以等待一個任務執行結束就回調,
也可以等待兩個任務都結束就回調,
或者等待其中一個結束就回調。
預設使用ForkJoinPool,也可自己配置。
CompletionStage
CompletableFuture實作了CompletionStage,CompletionStage 主要負責任務的執行,将任務分成一個一個stage在同一個stage會同時執行,執行結束會通知另一個stage執行
thenApply( Function )接收前一個任務傳回,并傳回目前任務結果
thenAccept( Consumer )接收前一個任務傳回
supplyAsync(Supplier )運作任務并傳回結果
//依賴單個階段
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); // 預設執行方式
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);// 預設的異步執行方式
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); //自定義的執行方式
//依賴兩個階段都完成
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
//依賴兩個階段中的任何一個完成
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
//任意階段 傳回 則回調
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
public static void main(String[] args) throws Exception{
// thenApply();
// thenCombine();
applyToEither();
}
//依賴單個階段
public static void thenApply()throws Exception{
//可完成的異步任務
CompletableFuture<Void> completableFuture = CompletableFuture
//Supplier 執行方法并傳回結果 不需要參數
.supplyAsync(() -> {
System.out.println("aa1");
return "q";
})
//Function
// .thenApplyAsync(a)-> { //異步
.thenApply((a)-> {
System.out.println("等待前一個方法執行結束 回調該方法 接收 前一個傳回,同步執行目前方法(由前一個線程同步執行) 并傳回結果");
return 1;}
)
//Consumer
.thenAcceptAsync(System.out::println)//等待前一個方法執行結束 回調該方法 接收前一個CompletableFuture傳回 異步執行目前方法
// .thenAccept(System.out::println)//接收前一個CompletableFuture傳回 同步執行目前方法
.thenRun(()-> System.out.println("等待前一個方法執行結束 回調該方法"))
;
//阻塞等待任務結束,無傳回值
completableFuture.get();
}
//依賴兩個階段都完成
public static void thenCombine()throws Exception{
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { //任務 1
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務1 執行結束");
return Thread.currentThread().getName() + " s1";
})
.thenCombine( //綁定到 任務1 stage
CompletableFuture.supplyAsync(() -> { //任務 2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務2 執行結束");
return Thread.currentThread().getName() + " s1";
}),
(s1, s2) -> { //等待 兩個任務執行結束,傳回目前結果
System.out.println(s1);
System.out.println(s2);
return 1;
}
);
Integer integer = integerCompletableFuture.get();
System.out.println(Thread.currentThread().getName() + " " +integer);
}
//依賴兩個階段中的任何一個完成
public static void applyToEither()throws Exception{
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { //任務 1
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務1 執行結束");
return Thread.currentThread().getName() + " s1";
})
.applyToEither( //綁定到 任務1 stage
CompletableFuture.supplyAsync(() -> { //任務 2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務2 執行結束");
return Thread.currentThread().getName() + " s1";
}),
(sany) -> { //等待 兩個任務執行結束,傳回目前結果
System.out.println(sany);
return 1;
}
);
Integer integer = integerCompletableFuture.get();
System.out.println(Thread.currentThread().getName() + " " +integer);
}
Completion stack
CompletableFuture内部有一個Completion stack,用于記錄回調
使用連結清單實作棧 為什麼?
因為連結清單空間不連續不需要擴容
數組實作 可以利用緩存行來加速讀取 但是增删會需要擴容
将ComplatableFuture嵌入runable 包裝成 Completion 放入線程池執行 最後将Completion 壓棧
public static void main(String[] args) {
//異步執行
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> action("動作1"));
/**
* A CompletableFuture may have dependent completion actions,
* collected in a linked stack.
* 将回調動作 都添加到 stack中
*/
//回調stage
completableFuture.thenRun(()-> System.out.println(1));
completableFuture.thenRun(()-> System.out.println(2));
completableFuture.thenRun(()-> System.out.println(3));
try {
completableFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void action(String act) {
System.out.println(act+"start");
//如果沒睡眠 動作1就直接執行完 後面任務 就不會被壓棧 是以睡眠就會倒叙輸出
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(act+"end");
}