今天遇到下載下傳資金流水記錄的場景。下載下傳的資料源于分頁查詢,一次5000條。當資料量到達十萬級的時候,僅僅通過for循環、每次設定pageNum,查詢的等待時間超過了容忍的範圍。下面示例展示了采用Callable和Future進行多線程查詢并使用CountDownLatch進行多線程同步。
// 進行首次查詢(略),擷取總頁數
int totalPage;
// 設定計數器,從0開始
final CountDownLatch countDownLatch = new CountDownLatch(totalPage - );
// 定義Future數組,數組大小和計數器個數相同
Future<PageResult<FundRecordDTO>>[] futures = new Future[totalPage - ];
for (int i = ; i < totalPage; i++) {
// 重設分頁參數,從1開始
final PageArg pageArg = new PageArg();
pageArg.setPageSize(ExcelHelper.pageSize);
pageArg.setPageIndex(i + );
// 将各個PageResult塞到線程池的各個線程裡,傳回Future數組
futures[i] = threadPool.submit(new Callable<PageResult<FundRecordDTO>>() {
// 傳回取得的PageResult
@Override
public PageResult<FundRecordDTO> call() throws Exception {
PageResult<FundRecordDTO> pageResult = new PageResult<FundRecordDTO>();
try {
pageResult = fundRecordService.findFundRecordPage(pageArg, fundRecordParam);
} catch (Exception e) {
throw e;
} finally {
// 線程完成任務後通過countDownLatch.countDown()來通知CountDownLatch對象,計數器-1
countDownLatch.countDown();
}
return pageResult;
}
});
}
// 所有任務執行完畢後觸發事件,喚醒await在latch上的主線程
countDownLatch.await();
// 合并記錄
for (int j = ; j < totalPage; j++) {
if (futures[j] != null && futures[j].get().getData() != null) {
fileDataList.addAll(futures[j].get().getData());
}
}
上面的例子用到了Callable、Future和CountDownLatch三個常用的多線程工具類,下面我們分别來了解下。
Callable和Future
Callable是類似于Runnable的接口,實作Callable接口的類和實作Runnable的類都是可被其他線程執行的任務。
Callable和Runnable的差別如下:
- Callable定義的方法是call,而Runnable定義的方法是run。
- Callable的call方法可以有傳回值,而Runnable的run方法不能有傳回值。
- Callable的call方法可抛出異常,而Runnable的run方法不能抛出異常。
public
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Future就是對于具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、擷取結果、設定結果操作。
public interface Future<V> {
// 可以取消任務的執行,參數為true表示立即中斷任務的執行,參數為false表示允許正在運作的任務運作完成。
boolean cancel(boolean mayInterruptIfRunning);
// 查詢是否取消掉
boolean isCancelled();
// 查詢是否完成
boolean isDone();
// 等待計算完成,擷取計算結果。
V get() throws InterruptedException, ExecutionException;
// 在逾時時間内等待計算完成,擷取計算結果。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
CountDownLatch
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程的操作執行完後再執行。例如,應用程式的主線程希望在負責啟動架構服務的線程已經啟動所有的架構服務之後再執行。
和其它并發工具類,如CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它們都存在于java.util.concurrent包下。
CountDownLatch是通過一個計數器來實作的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢複執行任務。這個初始值隻能被設定一次,而且CountDownLatch沒有提供任何機制去重新設定這個計數值。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiQ3chVEa0V3bT9CX5RXa2Fmcn9CXwczLcVmds92czlGZvwVP9EUTDZ0aRJkSwk0LcxGbpZ2LcBDM08CXlpXazRnbvZ2LcRlMMVDT2EWNvwFdu9mZvwVP9cnY5Z1RhZmTzMGb412YsZ1RjZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TN2MTM1AzMzITMzUDM3EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
與CountDownLatch的第一次互動是主線程等待其他線程。主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。