大家好,我是哪吒。
一、前情提要
在上一篇文章中,使用雙異步後,如何保證資料一緻性?,通過Future擷取異步傳回值,輪詢判斷Future狀态,如果執行完畢或已取消,則通過get()擷取傳回值,get()是阻塞的方法,是以會阻塞目前線程,如果通過new Runnable()執行get()方法,那麼還是需要傳回AsyncResult,然後再通過主線程去get()擷取異步線程傳回結果。
寫法很繁瑣,還會阻塞主線程。
下面是FutureTask異步執行流程圖:
二、JDK8的CompletableFuture
1、ForkJoinPool
Java8中引入了CompletableFuture,它實作了對Future的全面更新,可以通過回調的方式,擷取異步線程傳回值。
CompletableFuture的異步執行通過ForkJoinPool實作, 它使用守護線程去執行任務。
ForkJoinPool在于可以充分利用多核CPU的優勢,把一個任務拆分成多個小任務,把多個小任務放到多個CPU上并行執行,當多個小任務執行完畢後,再将其執行結果合并起來。
Future的異步執行是通過ThreadPoolExecutor實作的。
2、從ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的差別
- ForkJoinPool中的每個線程都會有一個隊列,而ThreadPoolExecutor隻有一個隊列,并根據queue類型不同,細分出各種線程池;
- ForkJoinPool在使用過程中,會建立大量的子任務,會進行大量的gc,但是ThreadPoolExecutor不需要,因為ThreadPoolExecutor是任務配置設定平均的;
- ThreadPoolExecutor中每個異步線程之間是互相獨立的,當執行速度快的線程執行完畢後,它就會一直處于空閑的狀态,等待其它線程執行完畢;
- ForkJoinPool中每個異步線程之間并不是絕對獨立的,在ForkJoinPool線程池中會維護一個隊列來存放需要執行的任務,當線程自身任務執行完畢後,它會從其它線程中擷取未執行的任務并幫助它執行,直至所有線程執行完畢。
是以,在多線程任務配置設定不均時,ForkJoinPool的執行效率更高。但是,如果任務配置設定均勻,ThreadPoolExecutor的執行效率更高,因為ForkJoinPool會建立大量子任務,并對其進行大量的GC,比較耗時。
三、通過CompletableFuture優化 “通過Future擷取異步傳回值”
1、通過Future擷取異步傳回值關鍵代碼
(1)将異步方法的傳回值改為Future<Integer>,将傳回值放到new AsyncResult<>();中;
@Async("async-executor")
public void readXls(String filePath, String filename) {
try {
// 此代碼為簡化關鍵性代碼
List<Future<Integer>> futureList = new ArrayList<>();
for (int time = 0; time < times; time++) {
Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
futureList.add(sumFuture);
}
}catch (Exception e){
logger.error("readXlsCacheAsync---插入資料異常:",e);
}
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
try {
// 此代碼為簡化關鍵性代碼
return new AsyncResult<>(sum);
}catch (Exception e){
return new AsyncResult<>(0);
}
}
(2)通過Future<Integer>.get()擷取傳回值:
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {
int[] futureSumArr = new int[futureList.size()];
for (int i = 0;i<futureList.size();i++) {
try {
Future<Integer> future = futureList.get(i);
while (true) {
if (future.isDone() && !future.isCancelled()) {
Integer futureSum = future.get();
logger.info("擷取Future傳回值成功"+"----Future:" + future
+ ",Result:" + futureSum);
futureSumArr[i] += futureSum;
break;
} else {
logger.info("Future正在執行---擷取Future傳回值中---等待3秒");
Thread.sleep(3000);
}
}
} catch (Exception e) {
logger.error("擷取Future傳回值異常: ", e);
}
}
boolean insertFlag = getInsertSum(futureSumArr, excelRow);
logger.info("擷取所有異步線程Future的傳回值成功,Excel插入結果="+insertFlag);
return insertFlag;
}
2、通過CompletableFuture擷取異步傳回值關鍵代碼
(1)将異步方法的傳回值改為 int
@Async("async-executor")
public void readXls(String filePath, String filename) {
List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
for (int time = 0; time < times; time++) {
// 此代碼為簡化關鍵性代碼
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
}
}).thenApply((result) -> {// 回調方法
return thenApplyTest2(result);// supplyAsync傳回值 * 1
}).thenApply((result) -> {
return thenApplyTest5(result);// thenApply傳回值 * 1
}).exceptionally((e) -> { // 如果執行異常:
logger.error("CompletableFuture.supplyAsync----異常:", e);
return null;
});
completableFutureList.add(completableFuture);
}
}
@Async("async-executor")
public int readXlsCacheAsync() {
try {
// 此代碼為簡化關鍵性代碼
return sum;
}catch (Exception e){
return -1;
}
}
(2)通過completableFuture.get()擷取傳回值
public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){
logger.info("通過completableFuture.get()擷取每個異步線程的插入結果----開始");
int sum = 0;
for (int i = 0; i < list.size(); i++) {
Integer result = list.get(i).get();
sum += result;
}
boolean insertFlag = excelRow == sum;
logger.info("全部執行完畢,excelRow={},入庫={}, 資料是否一緻={}",excelRow,sum,insertFlag);
return insertFlag;
}
3、效率對比
(1)測試環境
- 12個邏輯處理器的電腦;
- Excel中包含10萬條資料;
- Future的自定義線程池,核心線程數為24;
- ForkJoinPool的核心線程數為24;
(2)統計四種情況下10萬資料入庫時間
- 不擷取異步傳回值
- 通過Future擷取異步傳回值
- 通過CompletableFuture擷取異步傳回值,預設ForkJoinPool線程池的核心線程數為本機邏輯處理器數量,測試電腦為12;
- 通過CompletableFuture擷取異步傳回值,修改ForkJoinPool線程池的核心線程數為24。
備注:因為CompletableFuture不阻塞主線程,主線程執行時間隻有2秒,表格中統計的是異步線程全部執行完成的時間。
(3)設定核心線程數
将核心線程數CorePoolSize設定成CPU的處理器數量,是不是效率最高的?
// 擷取CPU的處理器數量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 測試電腦是24
因為在接口被調用後,開啟異步線程,執行入庫任務,因為測試機最多同時開啟24線程處理任務,故将10萬條資料拆分成等量的24份,也就是10萬/24 = 4166,那麼我設定成4200,是不是效率最佳呢?
測試的過程中發現,好像真的是這樣的。
自定義ForkJoinPool線程池
@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;
@Override
public void readXls(String filePath, String filename) {
List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
for (int time = 0; time < times; time++) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
try {
return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);
} catch (Exception e) {
logger.error("CompletableFuture----readXlsCacheAsync---異常:", e);
return -1;
}
};
},asyncTaskExecutor);
completableFutureList.add(completableFuture);
}
// 不會阻塞主線程
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
try {
int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
} catch (Exception ex) {
return;
}
});
}
自定義線程池
/**
* 自定義異步線程池
*/
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//設定線程名稱
executor.setThreadNamePrefix("asyncTask-Executor");
//設定最大線程數
executor.setMaxPoolSize(200);
//設定核心線程數
executor.setCorePoolSize(24);
//設定線程空閑時間,預設60
executor.setKeepAliveSeconds(200);
//設定隊列容量
executor.setQueueCapacity(50);
/**
* 當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕政策
* 通常有以下四種政策:
* ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。
* ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)
* ThreadPoolExecutor.CallerRunsPolicy:重試添加目前的任務,自動重複調用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
(4)統計分析
效率對比:
③通過CompletableFuture擷取異步傳回值(12線程) < ②通過Future擷取異步傳回值 < ④通過CompletableFuture擷取異步傳回值(24線程) < ①不擷取異步傳回值
不擷取異步傳回值時性能最優,這不廢話嘛~
核心線程數相同的情況下,CompletableFuture的入庫效率要優于Future的入庫效率,10萬條資料大概要快4秒鐘,這還是相當驚人的,優化的價值就在于此。
四、通過CompletableFuture.allOf解決阻塞主線程問題
1、文法
CompletableFuture.allOf(CompletableFuture的可變數組).whenComplete((r,e) -> {})。
2、代碼執行個體
getCompletableFutureResult方法在 “3.2.2 通過completableFuture.get()擷取傳回值”。
// 不會阻塞主線程
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
logger.info("全部執行完畢,解決主線程阻塞問題~");
try {
int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
} catch (Exception ex) {
logger.error("全部執行完畢,解決主線程阻塞問題,異常:", ex);
return;
}
});
// 會阻塞主線程
//getCompletableFutureResult(completableFutureList, excelRow);
logger.info("CompletableFuture----會阻塞主線程嗎?");
五、CompletableFuture中花俏的文法糖
1、runAsync
runAsync 方法不支援傳回值。
可以通過runAsync執行沒有傳回值的異步方法。
不會阻塞主線程。
// 分批異步讀取Excel内容并入庫
int finalEnd = end;
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
2、supplyAsync
supplyAsync也可以異步處理任務,傳入的對象實作了Supplier接口。将Supplier作為參數并傳回CompletableFuture<T>結果值,這意味着它不接受任何輸入參數,而是将result作為輸出傳回。
會阻塞主線程。
supplyAsync()方法關鍵代碼:
int finalEnd = end;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
}
});
@Override
public int readXlsCacheAsyncMybatis() {
// 不為人知的操作
// 傳回異步方法執行結果即可
return 100;
}
六、順序執行異步任務
1、thenRun
thenRun()不接受參數,也沒有傳回值,與runAsync()配套使用,恰到好處。
// JDK8的CompletableFuture
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis())
.thenRun(() -> logger.info("CompletableFuture----.thenRun()方法測試"));
2、thenAccept
thenAccept()接受參數,沒有傳回值。
supplyAsync + thenAccept
- 異步線程順序執行
- supplyAsync的異步傳回值,可以作為thenAccept的參數使用
- 不會阻塞主線程
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
}
}).thenAccept(x -> logger.info(".thenAccept()方法測試:" + x));
但是,此時無法通過completableFuture.get()擷取supplyAsync的傳回值了。
3、thenApply
thenApply在thenAccept的基礎上,可以再次通過completableFuture.get()擷取傳回值。
supplyAsync + thenApply,典型的鍊式程式設計。
- 異步線程内方法順序執行
- supplyAsync 的傳回值,作為第 1 個thenApply的參數,進行業務處理
- 第 1 個thenApply的傳回值,作為第 2 個thenApply的參數,進行業務處理
- 最後,通過future.get()方法擷取最終的傳回值
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
}
}).thenApply((result) -> {
return thenApplyTest2(result);// supplyAsync傳回值 * 2
}).thenApply((result) -> {
return thenApplyTest5(result);// thenApply傳回值 * 5
});
logger.info("readXlsCacheAsyncMybatis插入資料 * 2 * 5 = " + completableFuture.get());
七、CompletableFuture合并任務
- thenCombine,多個異步任務并行處理,有傳回值,最後合并結果傳回新的CompletableFuture對象;
- thenAcceptBoth,多個異步任務并行處理,無傳回值;
- acceptEither,多個異步任務并行處理,無傳回值;
- applyToEither,,多個異步任務并行處理,有傳回值;
CompletableFuture合并任務的代碼執行個體,這裡就不多贅述了,一些文法糖而已,大家切記陷入低水準勤奮的怪圈。
八、CompletableFuture VS Future總結
本文中以下幾個方面對比了CompletableFuture和Future的差異:
- ForkJoinPool和ThreadPoolExecutor的實作原理,探索了CompletableFuture和Future的差異;
- 通過代碼執行個體的形式簡單介紹了CompletableFuture中花俏的文法糖;
- 通過CompletableFuture優化了 “通過Future擷取異步傳回值”;
- 通過CompletableFuture.allOf解決阻塞主線程問題。
Future提供了異步執行的能力,但Future.get()會通過輪詢的方式擷取異步傳回值,get()方法還會阻塞主線程。
輪詢的方式非常消耗CPU資源,阻塞的方式顯然與我們的異步初衷背道而馳。
JDK8提供的CompletableFuture實作了Future接口,添加了很多Future不具備的功能,比如鍊式程式設計、異常處理回調函數、擷取異步結果不阻塞不輪詢、合并異步任務等。
擷取異步線程結果後,我們可以通過添加事務的方式,實作Excel入庫操作的資料一緻性。
異步多線程情況下如何實作事務?
有的小夥伴可能會說:
這還不簡單?添加@Transactional注解,如果發生異常或入庫資料量不符,直接復原就可以了~
那麼,真的是這樣嗎?我們下期見~