天天看点

使用CompletionService批处理任务(线程池阻塞线程)

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。

[java]   view plain copy

  1. import java.util.Random;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.CompletionService;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorCompletionService;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11. public class Test17 {  
  12.     public static void main(String[] args) throws Exception {  
  13.         Test17 t = new Test17();  
  14.         t.count1();  
  15.         t.count2();  
  16.     }  
  17. //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
  18.     public void count1() throws Exception{  
  19.         ExecutorService exec = Executors.newCachedThreadPool();  
  20.         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
  21.         for(int i=0; i<10; i++){  
  22.             Future<Integer> future =exec.submit(getTask());  
  23.             queue.add(future);  
  24.         }  
  25.         int sum = 0;  
  26.         int queueSize = queue.size();  
  27.         for(int i=0; i<queueSize; i++){  
  28.             sum += queue.take().get();  
  29.         }  
  30.         System.out.println("总数为:"+sum);  
  31.         exec.shutdown();  
  32.     }  
  33. //使用CompletionService(完成服务)保持Executor处理的结果  
  34.     public void count2() throws InterruptedException, ExecutionException{  
  35.         ExecutorService exec = Executors.newCachedThreadPool();  
  36.         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
  37.         for(int i=0; i<10; i++){  
  38.             execcomp.submit(getTask());  
  39.         }  
  40.         int sum = 0;  
  41.         for(int i=0; i<10; i++){  
  42. //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
  43.             Future<Integer> future = execcomp.take();  
  44.             sum += future.get();  
  45.         }  
  46.         System.out.println("总数为:"+sum);  
  47.         exec.shutdown();  
  48.     }  
  49.     //得到一个任务  
  50.     public Callable<Integer> getTask(){  
  51.         final Random rand = new Random();  
  52.         Callable<Integer> task = new Callable<Integer>(){  
  53.             @Override  
  54.             public Integer call() throws Exception {  
  55.                 int i = rand.nextInt(10);  
  56.                 int j = rand.nextInt(10);  
  57.                 int sum = i*j;  
  58.                 System.out.print(sum+"\t");  
  59.                 return sum;  
  60.             }  
  61.         };  
  62.         return task;  
  63.     }  
  64. }  

ExecutorCompletionService统一了ExecutorService和BlockingQueue,既有线程池功能,能提交任务,又有阻塞队列功能,能判断所有线程的执行结果。

继续阅读