天天看點

CompletableFuture中的CompletionException異常真是坑到我了!

作者:程式猿阿嘴
CompletableFuture中的CompletionException異常真是坑到我了!

開發環境

  • JDK 17
  • Idea 2022

熟悉JDK 8版本的同學,大機率都使用過java.util.concurrent.CompletableFuture這個類,有時候在業務服務中你可能需要并行的去執行某些騷操作,那就少不了它的存在。

我在業務中就有大量的需求場景,比如:我需要拉管道方的訂單和訂單明細,落地到我司系統中。此時用到了兩個接口:訂單清單查詢、訂單明細查詢,查詢清單很簡單,一頁一頁的翻頁查詢,因為它也隻傳回了訂單号,我舉個例子吧!

這裡是訂單清單查詢接口傳回的資料結構(Mock資料),一堆的訂單号

json複制代碼{
    "data":[
        "125345345345",
        "235894563423",
        "345345345343"
    ]
}
           

接下來我肯定要根據訂單号去查詢訂單明細的,千萬不要下意識的進行for循環一個一個查,那要查到什麼時候啊! 這個時候CompletableFuture就派上用場了。

CompletableFuture應用

前提條件,我為了友善使用,我是直接在業務的Base包中,封裝了一個通用實作,當然隻是适用于我們業務的,需要的同學自取。

注意:下面這段代碼沒有處理真實的異常,想直接用,再往下看!!!

php複制代碼
/**
 * 偷個懶,線程池直接這樣寫了先,真實業務中不是這樣搞的哈!
 */
private final static ExecutorService executorService = Executors.newFixedThreadPool(4);


/**
 * 建立并行任務并執行
 *
 * @param list            資料源
 * @param api             API調用邏輯
 * @param exceptionHandle 異常處理邏輯
 * @param <S>             資料源類型
 * @param <T>             程式傳回類型
 * @return 處理結果清單
 */
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
    //規整所有任務
    List<CompletableFuture<T>> collectFuture = list.stream()
            .map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(e, s)))
            .toList();
    //彙總所有任務,并執行join,全部執行完成後,統一傳回
    return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
            .thenApply(f -> collectFuture.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
            .join();
}

/**
 * 建立單個CompletableFuture任務
 *
 * @param logic           任務邏輯
 * @param exceptionHandle 異常處理
 * @param <T>             類型
 * @return 任務
 */
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
    return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}
           

利用上面這塊封裝的代碼,完全适用于我在公司的大部分并行業務場景,也确實提升了我Pod節點的CPU使用率。

不過之後有個問題就坑了,你調用外部API的時候,偶爾會失敗,那麼失敗就要重試,這個時候我就需要正确的判斷異常并進行重試操作。

先定義個業務異常類

java複制代碼public static class BizApiException extends RuntimeException {
    public BizApiException() {
    }

    public BizApiException(String message) {
        super(message);
    }
}
           

示例代碼

下面的代碼隻是模拟我在業務端的場景,大家樂呵樂呵就行。
java複制代碼public static void main(String[] args) {
    CompletableFutureDemo f = new CompletableFutureDemo();
    List<Integer> numList = f.parallelFutureJoin(Arrays.asList(1, 2, 3), num -> {
        //模拟API調用
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //...
        }
        if (num > 2) {
            throw new BizApiException("心别太大");
        }
        return num;
    }, (e, num) -> {
        //異常向你打開了大門
        if (e instanceof BizApiException) {
            System.out.println("業務異常,我在處理數字:" + num + ",異常原因:" + e);
            return -1;
        }
        System.out.println("我異常了,老六,我剛才在處理數字:" + num + ",異常原因:" + e);
        return -1;
    });
    System.out.println(numList);
}
           
注意:我本來想通過傳回的Exception判斷是不是BizApiException業務異常的,可惜的是這裡的異常類型永遠都不會是BizApiException,我輸出下内容到控制台,就懂了!

執行代碼後,我得到的控制台内容

這個時候其實可以看到拿到的真實異常類型了,一個名為java.util.concurrent.CompletionException的老六出現了。我看看它是個啥!
bash複制代碼我異常了,老六,我剛才在處理數字:3,異常原因:java.util.concurrent.CompletionException: com.java.basic.CompletableFutureDemo$BizApiException: 心别太大
[1, 2, -1]
           

CompletionException

這個類的注釋說明了它的用途,簡單了解是:它在完成結果或任務的過程中遇到錯誤或其他異常時會觸發。那我懂了!任務異常之後它就會出現,然後還把我們自己的異常給包起來了!

JDK源代碼

scala複制代碼/**
 * Exception thrown when an error or other exception is encountered
 * in the course of completing a result or task.
 *
 * @since 1.8
 * @author Doug Lea
 */
public class CompletionException extends RuntimeException {}
           

其實還有個異常類,我們也需要關注,它告訴了我們,真實的異常在哪裡了!

ExecutionException

這個類的注釋就說的很清楚了,自己的異常都在getCause()中了,那就好辦了.
scala複制代碼/**
 * Exception thrown when attempting to retrieve the result of a task
 * that aborted by throwing an exception. This exception can be
 * inspected using the {@link #getCause()} method.
 *
 * @see Future
 * @since 1.5
 * @author Doug Lea
 */
public class ExecutionException extends Exception {}
           

改造我的并發工具類(完整版)

方法extractRealException就是我要擷取的真實異常,同時parallelFutureJoin方法中引用一下,這個工具類就解決了需求。
typescript複制代碼/**
 * 建立并行任務并執行
 *
 * @param list            資料源
 * @param api             API調用邏輯
 * @param exceptionHandle 異常處理邏輯
 * @param <S>             資料源類型
 * @param <T>             程式傳回類型
 * @return 處理結果清單
 */
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
    //規整所有任務
    List<CompletableFuture<T>> collectFuture = list.stream()
            .map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(
this.extractRealException(e), s)))
            .toList();
    //彙總所有任務,并執行join,全部執行完成後,統一傳回
    return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
            .thenApply(f -> collectFuture.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
            .join();
}

/**
 * 建立CompletableFuture任務
 *
 * @param logic           任務邏輯
 * @param exceptionHandle 異常處理
 * @param <T>             類型
 * @return 任務
 */
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
    return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}

/**
 * 提取真正的異常
 * <p>
 * CompletableFuture抛出的往往不是真正的異常
 *
 * @param throwable 異常
 * @return 真正的異常
 */
public Throwable extractRealException(Throwable throwable) {
    if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
        if (throwable.getCause() != null) {
            return throwable.getCause();
        }
    }
    return throwable;
}
           

當然,這還隻是個簡易版的并行任務工具類,還有更多的可能,大家需要自己去探索了!

參考文獻

  • docs.oracle.com/javase/8/do…
原文連結:https://juejin.cn/post/7208188047707619383