天天看点

CompletionService批量异步执行

作者:Java面试365

CompletionService批量异步执行

前景引入

我们知道线程池可以执行异步任务,同时可以通过返回值Future获取返回值,所以异步任务大多数采用ThreadPoolExecutor+Future,如果存在如下情况,需要从任务一二三中获取返回值后,保存到数据库中,用异步逻辑实现代码应该如下所示。

public static void main(String[] args) throws ExecutionException, InterruptedException {
     ExecutorService executorService = Executors.newFixedThreadPool(3);
 
     Future<Integer> f1 = executorService.submit(() -> {
         System.out.println("执行任务一");
         return 1;
     });
 
     Future<Integer> f2 = executorService.submit(() -> {
         System.out.println("执行任务二");
         return 2;
     });
 
     Future<Integer> f3 = executorService.submit(() -> {
         System.out.println("执行任务三");
         return 3;
     });
 
     Integer r1 = f1.get();
     executorService.execute(()->{
         // 省略保存r1操作
         System.out.println(r1);
     });
 
     Integer r2 = f2.get();
     executorService.execute(()->{
         // 省略保存r2操作
         System.out.println(r2);
     });
 
     Integer r3 = f3.get();
     executorService.execute(()->{
         // 省略保存r3操作
         System.out.println(r3);
     });
 
     executorService.shutdown();
 }           

这样写的代码一点毛病没有,逻辑都是正常的,但如果存在任务一查询了比较耗时的操作,由于f1.get是阻塞执行,那么就算任务二和任务三已经返回结果,任务二的返回值和任务三的返回值都是不能保存到数据库的,因为f1.get将主线程阻塞了。

批量异步实现

那可以如何处理呢?可以采用万能的阻塞队列,任务先执行完毕的先入队,这样可以保证其它线程入库的速度不受影响,提高效率。

public static void main(String[] args) throws ExecutionException, InterruptedException {
     ExecutorService executorService = Executors.newFixedThreadPool(3);
     ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
 
     Future<Integer> f1 = executorService.submit(() -> {
         System.out.println("执行任务一");
         Thread.sleep(5000);
         return 1;
     });
 
     Future<Integer> f2 = executorService.submit(() -> {
         System.out.println("执行任务二");
         return 2;
     });
 
     Future<Integer> f3 = executorService.submit(() -> {
         System.out.println("执行任务三");
         Thread.sleep(3000);
         return 3;
     });
 
 
     executorService.execute(()->{
         try {
             Integer r1 = f1.get();
             // 阻塞队列入队操作
             queue.put(r1);
             System.out.println(r1);
         } catch (Exception e) {
             e.printStackTrace();
         }
     });
 
     executorService.execute(()->{
         try {
             Integer r2 = f2.get();
             queue.put(r2);
             System.out.println(r2);
         } catch (Exception e) {
             e.printStackTrace();
         }
     });
 
     executorService.execute(()->{
         try {
             Integer r3 = f3.get();
             queue.put(r3);
             System.out.println(r3);
         } catch (Exception e) {
             e.printStackTrace();
         }
     });
 
     // 循环次数不要使用queue.size限制,因为不同时刻queue.size值是有可能不同的
     for (int i = 0; i <3; i++) {
         Integer integer = queue.take();
         // 省略保存integer操作
         executorService.execute(()->{
             System.out.println("保存入库=="+integer);
         });
     }
 
     executorService.shutdown();
 }           

产生结果如下

CompletionService批量异步执行

同样的在生产中不建议使用,因为SDK为我们提供了工具类CompletionService,CompletionService内部就维护了一个阻塞队列,唯一与上述代码实现有所区别的是,阻塞队列入库的是Future对象,其余原理类似。

CompletionService

如何创建CompletionService

CompletionService同样是一个接口,其具体实现为ExecutorCompletionService,创建CompletionService对象有两种方式

public ExecutorCompletionService(Executor executor);
 public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)           

CompletionService对象的创建都是需要指定线程池,如果在创建时没有传入阻塞对象,那么会采用默认的LinkedBlockingQueue无界阻塞队列,如果应用到生产可能会产生OOM的情况,这是需要注意的。

CompletionService初体验

CompletionService如何做到批量执行异步任务呢,将上述场景采用CompletionService实现下

public static void main(String[] args) throws InterruptedException, ExecutionException {
     ExecutorService executorService = Executors.newFixedThreadPool(3);
 
     CompletionService completionService = new ExecutorCompletionService(executorService);
 
     Future<Integer> f1 = completionService.submit(() -> {
         System.out.println("执行任务一");
         Thread.sleep(5000);
         return 1;
     });
 
     Future<Integer> f2 = completionService.submit(() -> {
         System.out.println("执行任务二");
         return 2;
     });
 
     Future<Integer> f3 = completionService.submit(() -> {
         System.out.println("执行任务三");
         Thread.sleep(3000);
         return 3;
     });
 
     for (int i = 0; i <3 ; i++) {
         Future take = completionService.take();
         Integer integer = (Integer) take.get();
         executorService.execute(()->{
             System.out.println("执行入库=="+integer);
         });
     }
     executorService.shutdown();
 }           

CompletionService接口说明

CompletionService的方法不多,使用起来比较简单,方法签名如下

// 提交任务到阻塞队列,带返回值的
Future<V> submit(Callable<V> task);
// 提交任务到阻塞队列,和ThreadPoolExecutor的submit方法类似
Future<V> submit(Runnable task, V result);
// 从阻塞队列中出队,阻塞队列空就阻塞
Future<V> take() throws InterruptedException;
// 从阻塞队列中出队 非阻塞,如果阻塞队列为空立即返回null
Future<V> poll();
// 从阻塞队列中出队,非阻塞,如果等待timeout时间后阻塞队列还为空,那么立即返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;           

总结

CompletionService主要是去解决无效等待的问题,如果一个耗时较长的任务在执行,那么可以采用这种方式避免无效的等待,CompletionService还能让异步任务的执行结果有序化,先执行完就先进入阻塞队列。

继续阅读