開發環境
- JDK 17
- Idea 2022
熟悉JDK 8版本的同學,大機率都使用過java.util.concurrent.CompletableFuture這個類,有時候在業務服務中你可能需要并行的去執行某些騷操作,那就少不了它的存在。
我在業務中就有大量的需求場景,比如:我需要拉管道方的訂單和訂單明細,落地到我司系統中。此時用到了兩個接口:訂單清單查詢、訂單明細查詢,查詢清單很簡單,一頁一頁的翻頁查詢,因為它也隻傳回了訂單号,我舉個例子吧!
這裡是訂單清單查詢接口傳回的資料結構(Mock資料),一堆的訂單号
json複制代碼{
"data":[
"125345345345",
"235894563423",
"345345345343"
]
}
接下來我肯定要根據訂單号去查詢訂單明細的,千萬不要下意識的進行for循環一個一個查,那要查到什麼時候啊! 這個時候CompletableFuture就派上用場了。
CompletableFuture應用
前提條件,我為了友善使用,我是直接在業務的Base包中,封裝了一個通用實作,當然隻是适用于我們業務的,需要的同學自取。
注意:下面這段代碼沒有處理真實的異常,想直接用,再往下看!!!
php複制代碼
/**
* 偷個懶,線程池直接這樣寫了先,真實業務中不是這樣搞的哈!
*/
private final static ExecutorService executorService = Executors.newFixedThreadPool(4);
/**
* 建立并行任務并執行
*
* @param list 資料源
* @param api API調用邏輯
* @param exceptionHandle 異常處理邏輯
* @param <S> 資料源類型
* @param <T> 程式傳回類型
* @return 處理結果清單
*/
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
//規整所有任務
List<CompletableFuture<T>> collectFuture = list.stream()
.map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(e, s)))
.toList();
//彙總所有任務,并執行join,全部執行完成後,統一傳回
return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
.thenApply(f -> collectFuture.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.join();
}
/**
* 建立單個CompletableFuture任務
*
* @param logic 任務邏輯
* @param exceptionHandle 異常處理
* @param <T> 類型
* @return 任務
*/
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}
利用上面這塊封裝的代碼,完全适用于我在公司的大部分并行業務場景,也确實提升了我Pod節點的CPU使用率。
不過之後有個問題就坑了,你調用外部API的時候,偶爾會失敗,那麼失敗就要重試,這個時候我就需要正确的判斷異常并進行重試操作。
先定義個業務異常類
java複制代碼public static class BizApiException extends RuntimeException {
public BizApiException() {
}
public BizApiException(String message) {
super(message);
}
}
示例代碼
下面的代碼隻是模拟我在業務端的場景,大家樂呵樂呵就行。
java複制代碼public static void main(String[] args) {
CompletableFutureDemo f = new CompletableFutureDemo();
List<Integer> numList = f.parallelFutureJoin(Arrays.asList(1, 2, 3), num -> {
//模拟API調用
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//...
}
if (num > 2) {
throw new BizApiException("心别太大");
}
return num;
}, (e, num) -> {
//異常向你打開了大門
if (e instanceof BizApiException) {
System.out.println("業務異常,我在處理數字:" + num + ",異常原因:" + e);
return -1;
}
System.out.println("我異常了,老六,我剛才在處理數字:" + num + ",異常原因:" + e);
return -1;
});
System.out.println(numList);
}
注意:我本來想通過傳回的Exception判斷是不是BizApiException業務異常的,可惜的是這裡的異常類型永遠都不會是BizApiException,我輸出下内容到控制台,就懂了!
執行代碼後,我得到的控制台内容
這個時候其實可以看到拿到的真實異常類型了,一個名為java.util.concurrent.CompletionException的老六出現了。我看看它是個啥!
bash複制代碼我異常了,老六,我剛才在處理數字:3,異常原因:java.util.concurrent.CompletionException: com.java.basic.CompletableFutureDemo$BizApiException: 心别太大
[1, 2, -1]
CompletionException
這個類的注釋說明了它的用途,簡單了解是:它在完成結果或任務的過程中遇到錯誤或其他異常時會觸發。那我懂了!任務異常之後它就會出現,然後還把我們自己的異常給包起來了!
JDK源代碼
scala複制代碼/**
* Exception thrown when an error or other exception is encountered
* in the course of completing a result or task.
*
* @since 1.8
* @author Doug Lea
*/
public class CompletionException extends RuntimeException {}
其實還有個異常類,我們也需要關注,它告訴了我們,真實的異常在哪裡了!
ExecutionException
這個類的注釋就說的很清楚了,自己的異常都在getCause()中了,那就好辦了.
scala複制代碼/**
* Exception thrown when attempting to retrieve the result of a task
* that aborted by throwing an exception. This exception can be
* inspected using the {@link #getCause()} method.
*
* @see Future
* @since 1.5
* @author Doug Lea
*/
public class ExecutionException extends Exception {}
改造我的并發工具類(完整版)
方法extractRealException就是我要擷取的真實異常,同時parallelFutureJoin方法中引用一下,這個工具類就解決了需求。
typescript複制代碼/**
* 建立并行任務并執行
*
* @param list 資料源
* @param api API調用邏輯
* @param exceptionHandle 異常處理邏輯
* @param <S> 資料源類型
* @param <T> 程式傳回類型
* @return 處理結果清單
*/
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
//規整所有任務
List<CompletableFuture<T>> collectFuture = list.stream()
.map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(
this.extractRealException(e), s)))
.toList();
//彙總所有任務,并執行join,全部執行完成後,統一傳回
return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
.thenApply(f -> collectFuture.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.join();
}
/**
* 建立CompletableFuture任務
*
* @param logic 任務邏輯
* @param exceptionHandle 異常處理
* @param <T> 類型
* @return 任務
*/
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}
/**
* 提取真正的異常
* <p>
* CompletableFuture抛出的往往不是真正的異常
*
* @param throwable 異常
* @return 真正的異常
*/
public Throwable extractRealException(Throwable throwable) {
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
當然,這還隻是個簡易版的并行任務工具類,還有更多的可能,大家需要自己去探索了!
參考文獻
- docs.oracle.com/javase/8/do…
原文連結:https://juejin.cn/post/7208188047707619383