laitimes

并发编程-ExecutorCompletionService解析

author:JD Cloud developer

1. Brief introduction

In concurrent programming, most of the current practices are to add tasks to the thread pool, get the Future object, add it to the collection, and after all the tasks are added to the thread pool, call future.get() by traversing the Future collection. to get the result of each task, so that the task added to the thread pool first waits for it to complete, but there is no guarantee that the first task added to the thread pool is the first to be executed, so this situation will occur, the task added to the thread pool has been completed, but you must wait for the first task to be executed and the result can be processed before the next task can be processed.

If you want to process the tasks that are completed first, regardless of the order in which they are added to the thread pool, you need to use the ExecutorCompletionService tool.

2. Source Cave Analysis

ExecutorCompletionService实现了CompletionService接口。 CompletionService接种有有以下方法。

public interface CompletionService<V> {
    // 提交任务
    Future<V> submit(Callable<V> task);
    // 提交任务
    Future<V> submit(Runnable task, V result);
    // 获取任务结果,带抛出异常
    Future<V> take() throws InterruptedException;
    // 获取任务结果
    Future<V> poll();
    // 获取任务结果,带超时
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

           

You can see that the methods in the API are very simple, and there are only two methods: submitting a task and obtaining the task result.

Let's take a look at the code in the implementation class ExecutorCompletionService.

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask的子类,重写FutureTask完成后的done方法
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
	// task任务执行完成后将任务放到队列中
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * 构造方法,传入一个线程池,创建一个队列
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * 构造方法,传入线程池和队列
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    // 提交一个task任务,最终将任务封装成QueueingFuture并由指定的线程池执行
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    // 提交一个task任务,最终将任务封装成QueueingFuture并由指定的线程池执行
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    // 从队列中获取执行完成的RunnableFuture对象,take方法会阻塞直到有数据
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    // 从队列中获取执行完成的RunnableFuture对象
    public Future<V> poll() {
        return completionQueue.poll();
    }

    // 从队列中获取执行完成的RunnableFuture对象
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

           

By looking at the code in the implementation class, we can see that this method is very simple, and the principle is divided into the following steps:

1. When constructing the ExecutorCompletionService object, you need to pass in a given thread pool or blocking queue.

2. When we submit a task to the ExecutorCompletionService, the submitted task will be wrapped into a QueueingFuture object, and then handed over to the thread pool specified by us for execution.

3. When the task is executed, the QueueingFuture object will execute the final done method (the new method of the QueueingFuture object) to add the RunnableFuture object to the specified blocking queue.

4. We can use the poll or take method to get the RunnableFuture object in the queue in order to get the execution result.

From this, we can find that the task execution result we get has nothing to do with the order of the task submitted to the thread pool, which task is completed first, it will be added to the queue, and we can get the execution result first.

3. Usage scenarios

1. When we don't pay attention to the order of tasks submitted to the thread pool and the order in which the task is executed and the results are obtained, we can use ExecutorCompletionService to execute the task. Here's the sample code.

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        for (Callable<Result> s : solvers) {
            ecs.submit(s);
        }
        int n = solvers.size();
        for (int i = 0; i < n; ++i) {
            Result r = ecs.take().get();
            if (r != null) {
                use(r);
            }
        }
    }

           

2. When multiple tasks are executed at the same time, we only need to get the execution result of the first task, and the rest of the results do not need to be cared about, we can also use ExecutorCompletionService to execute the task. Here's the sample code.

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        int n = solvers.size();
        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
        Result result = null;
        try {
            for (Callable<Result> s : solvers) {
                futures.add(ecs.submit(s));
            }

            for (int i = 0; i < n; ++i) {
                try {
                    Result r = ecs.take().get();
                    if (r != null) {
                        result = r;
                        break;
                    }
                } catch (ExecutionException ignore) {
                }
            }
        } finally {
            for (Future<Result> f : futures) {
                f.cancel(true);
            }
        }

        if (result != null) {
            use(result);
        }
    }

           

4. Code practice

In the business, we have this scenario, we have a batch of orders for batch update, each time an order is processed, we need to maintain the processing progress to ensure that the order processing progress is updated to the latest progress data in real time, we use ExecutorCompletionService at this time.

protected void parallelBatchUpdateWaybill(Map<String, LwbMain> lwbMainMap, Map<String, UpdateWaybillTaskDetail> taskDetailMap) {
        long start = System.currentTimeMillis();
        log.info("{} 并行批量更新订单开始:{}", traceId, taskNo);
        int total = lwbMainMap.size();
        BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<>(total + 2);
        ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(parallelUpdateWaybillExecutorService, blockingQueue);
        for (Map.Entry<String, UpdateWaybillTaskDetail> entry : taskDetailMap.entrySet()) {
            String lwbNo = entry.getKey();
            LwbMain lwbMain = lwbMainMap.get(lwbNo);
            UpdateWaybillTaskDetail taskDetail = entry.getValue();
            executorCompletionService.submit(() -> this.updateSingleWaybill(lwbMain, taskDetail), "done");
        }

        for (int current = 0; current < taskDetailMap.size(); current++) {
            try {
                executorCompletionService.take().get();
            } catch (Exception e) {
                log.error("{} 获取并行批量更新订单结果异常:{}", traceId, e.getMessage(), e);
            } finally {
                jimClient.incr(importTaskNo);
            }
        }

        long end = System.currentTimeMillis();
        log.info("{} 并行批量更新订单结束:{},耗时:{}", traceId, taskNo, (end - start));
    }

           

Read on