CompletionService批量異步執行
前景引入
我們知道線程池可以執行異步任務,同時可以通過傳回值Future擷取傳回值,是以異步任務大多數采用ThreadPoolExecutor+Future,如果存在如下情況,需要從任務一二三中擷取傳回值後,儲存到資料庫中,用異步邏輯實作代碼應該如下所示。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<Integer> f1 = executorService.submit(() -> {
System.out.println("執行任務一");
return 1;
});
Future<Integer> f2 = executorService.submit(() -> {
System.out.println("執行任務二");
return 2;
});
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("執行任務三");
return 3;
});
Integer r1 = f1.get();
executorService.execute(()->{
// 省略儲存r1操作
System.out.println(r1);
});
Integer r2 = f2.get();
executorService.execute(()->{
// 省略儲存r2操作
System.out.println(r2);
});
Integer r3 = f3.get();
executorService.execute(()->{
// 省略儲存r3操作
System.out.println(r3);
});
executorService.shutdown();
}
這樣寫的代碼一點毛病沒有,邏輯都是正常的,但如果存在任務一查詢了比較耗時的操作,由于f1.get是阻塞執行,那麼就算任務二和任務三已經傳回結果,任務二的傳回值和任務三的傳回值都是不能儲存到資料庫的,因為f1.get将主線程阻塞了。
批量異步實作
那可以如何處理呢?可以采用萬能的阻塞隊列,任務先執行完畢的先入隊,這樣可以保證其它線程入庫的速度不受影響,提高效率。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
Future<Integer> f1 = executorService.submit(() -> {
System.out.println("執行任務一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = executorService.submit(() -> {
System.out.println("執行任務二");
return 2;
});
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("執行任務三");
Thread.sleep(3000);
return 3;
});
executorService.execute(()->{
try {
Integer r1 = f1.get();
// 阻塞隊列入隊操作
queue.put(r1);
System.out.println(r1);
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(()->{
try {
Integer r2 = f2.get();
queue.put(r2);
System.out.println(r2);
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(()->{
try {
Integer r3 = f3.get();
queue.put(r3);
System.out.println(r3);
} catch (Exception e) {
e.printStackTrace();
}
});
// 循環次數不要使用queue.size限制,因為不同時刻queue.size值是有可能不同的
for (int i = 0; i <3; i++) {
Integer integer = queue.take();
// 省略儲存integer操作
executorService.execute(()->{
System.out.println("儲存入庫=="+integer);
});
}
executorService.shutdown();
}
産生結果如下
同樣的在生産中不建議使用,因為SDK為我們提供了工具類CompletionService,CompletionService内部就維護了一個阻塞隊列,唯一與上述代碼實作有所差別的是,阻塞隊列入庫的是Future對象,其餘原理類似。
CompletionService
如何建立CompletionService
CompletionService同樣是一個接口,其具體實作為ExecutorCompletionService,建立CompletionService對象有兩種方式
public ExecutorCompletionService(Executor executor);
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
CompletionService對象的建立都是需要指定線程池,如果在建立時沒有傳入阻塞對象,那麼會采用預設的LinkedBlockingQueue無界阻塞隊列,如果應用到生産可能會産生OOM的情況,這是需要注意的。
CompletionService初體驗
CompletionService如何做到批量執行異步任務呢,将上述場景采用CompletionService實作下
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService completionService = new ExecutorCompletionService(executorService);
Future<Integer> f1 = completionService.submit(() -> {
System.out.println("執行任務一");
Thread.sleep(5000);
return 1;
});
Future<Integer> f2 = completionService.submit(() -> {
System.out.println("執行任務二");
return 2;
});
Future<Integer> f3 = completionService.submit(() -> {
System.out.println("執行任務三");
Thread.sleep(3000);
return 3;
});
for (int i = 0; i <3 ; i++) {
Future take = completionService.take();
Integer integer = (Integer) take.get();
executorService.execute(()->{
System.out.println("執行入庫=="+integer);
});
}
executorService.shutdown();
}
CompletionService接口說明
CompletionService的方法不多,使用起來比較簡單,方法簽名如下
// 送出任務到阻塞隊列,帶傳回值的
Future<V> submit(Callable<V> task);
// 送出任務到阻塞隊列,和ThreadPoolExecutor的submit方法類似
Future<V> submit(Runnable task, V result);
// 從阻塞隊列中出隊,阻塞隊列空就阻塞
Future<V> take() throws InterruptedException;
// 從阻塞隊列中出隊 非阻塞,如果阻塞隊列為空立即傳回null
Future<V> poll();
// 從阻塞隊列中出隊,非阻塞,如果等待timeout時間後阻塞隊列還為空,那麼立即傳回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
總結
CompletionService主要是去解決無效等待的問題,如果一個耗時較長的任務在執行,那麼可以采用這種方式避免無效的等待,CompletionService還能讓異步任務的執行結果有序化,先執行完就先進入阻塞隊列。