天天看點

CompletionService批量異步執行

作者:Java面試365

CompletionService批量異步執行

前景引入

我們知道線程池可以執行異步任務,同時可以通過傳回值Future擷取傳回值,是以異步任務大多數采用ThreadPoolExecutor+Future,如果存在如下情況,需要從任務一二三中擷取傳回值後,儲存到資料庫中,用異步邏輯實作代碼應該如下所示。

public static void main(String[] args) throws ExecutionException, InterruptedException {
     ExecutorService executorService = Executors.newFixedThreadPool(3);
 
     Future<Integer> f1 = executorService.submit(() -> {
         System.out.println("執行任務一");
         return 1;
     });
 
     Future<Integer> f2 = executorService.submit(() -> {
         System.out.println("執行任務二");
         return 2;
     });
 
     Future<Integer> f3 = executorService.submit(() -> {
         System.out.println("執行任務三");
         return 3;
     });
 
     Integer r1 = f1.get();
     executorService.execute(()->{
         // 省略儲存r1操作
         System.out.println(r1);
     });
 
     Integer r2 = f2.get();
     executorService.execute(()->{
         // 省略儲存r2操作
         System.out.println(r2);
     });
 
     Integer r3 = f3.get();
     executorService.execute(()->{
         // 省略儲存r3操作
         System.out.println(r3);
     });
 
     executorService.shutdown();
 }           

這樣寫的代碼一點毛病沒有,邏輯都是正常的,但如果存在任務一查詢了比較耗時的操作,由于f1.get是阻塞執行,那麼就算任務二和任務三已經傳回結果,任務二的傳回值和任務三的傳回值都是不能儲存到資料庫的,因為f1.get将主線程阻塞了。

批量異步實作

那可以如何處理呢?可以采用萬能的阻塞隊列,任務先執行完畢的先入隊,這樣可以保證其它線程入庫的速度不受影響,提高效率。

public static void main(String[] args) throws ExecutionException, InterruptedException {
     ExecutorService executorService = Executors.newFixedThreadPool(3);
     ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
 
     Future<Integer> f1 = executorService.submit(() -> {
         System.out.println("執行任務一");
         Thread.sleep(5000);
         return 1;
     });
 
     Future<Integer> f2 = executorService.submit(() -> {
         System.out.println("執行任務二");
         return 2;
     });
 
     Future<Integer> f3 = executorService.submit(() -> {
         System.out.println("執行任務三");
         Thread.sleep(3000);
         return 3;
     });
 
 
     executorService.execute(()->{
         try {
             Integer r1 = f1.get();
             // 阻塞隊列入隊操作
             queue.put(r1);
             System.out.println(r1);
         } catch (Exception e) {
             e.printStackTrace();
         }
     });
 
     executorService.execute(()->{
         try {
             Integer r2 = f2.get();
             queue.put(r2);
             System.out.println(r2);
         } catch (Exception e) {
             e.printStackTrace();
         }
     });
 
     executorService.execute(()->{
         try {
             Integer r3 = f3.get();
             queue.put(r3);
             System.out.println(r3);
         } catch (Exception e) {
             e.printStackTrace();
         }
     });
 
     // 循環次數不要使用queue.size限制,因為不同時刻queue.size值是有可能不同的
     for (int i = 0; i <3; i++) {
         Integer integer = queue.take();
         // 省略儲存integer操作
         executorService.execute(()->{
             System.out.println("儲存入庫=="+integer);
         });
     }
 
     executorService.shutdown();
 }           

産生結果如下

CompletionService批量異步執行

同樣的在生産中不建議使用,因為SDK為我們提供了工具類CompletionService,CompletionService内部就維護了一個阻塞隊列,唯一與上述代碼實作有所差別的是,阻塞隊列入庫的是Future對象,其餘原理類似。

CompletionService

如何建立CompletionService

CompletionService同樣是一個接口,其具體實作為ExecutorCompletionService,建立CompletionService對象有兩種方式

public ExecutorCompletionService(Executor executor);
 public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)           

CompletionService對象的建立都是需要指定線程池,如果在建立時沒有傳入阻塞對象,那麼會采用預設的LinkedBlockingQueue無界阻塞隊列,如果應用到生産可能會産生OOM的情況,這是需要注意的。

CompletionService初體驗

CompletionService如何做到批量執行異步任務呢,将上述場景采用CompletionService實作下

public static void main(String[] args) throws InterruptedException, ExecutionException {
     ExecutorService executorService = Executors.newFixedThreadPool(3);
 
     CompletionService completionService = new ExecutorCompletionService(executorService);
 
     Future<Integer> f1 = completionService.submit(() -> {
         System.out.println("執行任務一");
         Thread.sleep(5000);
         return 1;
     });
 
     Future<Integer> f2 = completionService.submit(() -> {
         System.out.println("執行任務二");
         return 2;
     });
 
     Future<Integer> f3 = completionService.submit(() -> {
         System.out.println("執行任務三");
         Thread.sleep(3000);
         return 3;
     });
 
     for (int i = 0; i <3 ; i++) {
         Future take = completionService.take();
         Integer integer = (Integer) take.get();
         executorService.execute(()->{
             System.out.println("執行入庫=="+integer);
         });
     }
     executorService.shutdown();
 }           

CompletionService接口說明

CompletionService的方法不多,使用起來比較簡單,方法簽名如下

// 送出任務到阻塞隊列,帶傳回值的
Future<V> submit(Callable<V> task);
// 送出任務到阻塞隊列,和ThreadPoolExecutor的submit方法類似
Future<V> submit(Runnable task, V result);
// 從阻塞隊列中出隊,阻塞隊列空就阻塞
Future<V> take() throws InterruptedException;
// 從阻塞隊列中出隊 非阻塞,如果阻塞隊列為空立即傳回null
Future<V> poll();
// 從阻塞隊列中出隊,非阻塞,如果等待timeout時間後阻塞隊列還為空,那麼立即傳回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;           

總結

CompletionService主要是去解決無效等待的問題,如果一個耗時較長的任務在執行,那麼可以采用這種方式避免無效的等待,CompletionService還能讓異步任務的執行結果有序化,先執行完就先進入阻塞隊列。

繼續閱讀