laitimes

When I get a double asynchronous return value, how do I ensure that the main thread does not block?

author:Nezha programming

Hello everyone, I'm Nezha.

1. Synopsis

In the previous article, how to ensure data consistency after using double asynchronous?, get the asynchronous return value through Future, poll to judge the future state, if the execution is completed or canceled, get the return value through get(), get() is a blocking method, so it will block the current thread, if the get() method is executed through new Runnable(), then you still need to return AsyncResult, and then go to get() through the main thread Get the result returned by the asynchronous thread.

It's cumbersome to write and blocks the main thread.

Here's a flowchart for the asynchronous execution of FutureTask:

When I get a double asynchronous return value, how do I ensure that the main thread does not block?

二、JDK8的CompletableFuture

1、ForkJoinPool

In Java 8, CompletableFuture was introduced, which implements a comprehensive upgrade of Future, and can obtain the return value of asynchronous threads through callbacks.

The asynchronous execution of CompletableFuture is implemented through ForkJoinPool, which uses a daemon thread to execute tasks.

ForkJoinPool can make full use of the advantages of multi-core CPUs, split a task into multiple small tasks, put multiple small tasks on multiple CPUs for parallel execution, and then merge the execution results when multiple small tasks are executed.

The asynchronous execution of Futures is implemented via the ThreadPoolExecutor.

When I get a double asynchronous return value, how do I ensure that the main thread does not block?

2. Explore the difference between CompletableFuture and Future from ForkJoinPool and ThreadPoolExecutor

  1. Each thread in the ForkJoinPool will have a queue, while the ThreadPoolExecutor has only one queue, and various thread pools are subdivided according to different queue types.
  2. In the process of using ForkJoinPool, a large number of subtasks will be created, and a large number of gc will be performed, but ThreadPoolExecutor is not needed, because ThreadPoolExecutor is evenly distributed with tasks;
  3. Each asynchronous thread in the ThreadPoolExecutor is independent of each other, and when the fast thread finishes executing, it will remain idle, waiting for other threads to finish executing.
  4. Each asynchronous thread in the ForkJoinPool is not absolutely independent of each other, in the ForkJoinPool thread pool will maintain a queue to store the tasks that need to be executed, when the thread's own task is executed, it will get the unexecuted tasks from other threads and help it execute until all threads have completed execution.

Therefore, when multi-threaded tasks are unevenly distributed, the execution efficiency of ForkJoinPool is higher. However, if the tasks are evenly distributed, the execution of the ThreadPoolExecutor is more efficient because the ForkJoinPool creates a large number of subtasks and GC them, which is more time-consuming.

3. Optimize "Get Asynchronous Return Value by Future" through CompletableFuture

1. Obtain the key code of the asynchronous return value through Future

(1) Change the return value of the asynchronous method to Future<Integer>, and put the return value to new AsyncResult<>(); Middle;

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
        // 此代码为简化关键性代码
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入数据异常:",e);
    }
}           
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}           

(2) Get the <Integer>return value through Future.get():

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("获取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在执行---获取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("获取Future返回值异常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);
    return insertFlag;
}           

2. Obtain the key code of the asynchronous return value through CompletableFuture

(1) Change the return value of the asynchronous method to int

@Async("async-executor")
public void readXls(String filePath, String filename) {
    List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
        // 此代码为简化关键性代码
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
            }
        }).thenApply((result) -> {// 回调方法
            return thenApplyTest2(result);// supplyAsync返回值 * 1
        }).thenApply((result) -> {
            return thenApplyTest5(result);// thenApply返回值 * 1
        }).exceptionally((e) -> { // 如果执行异常:
            logger.error("CompletableFuture.supplyAsync----异常:", e);
            return null;
        });
    
        completableFutureList.add(completableFuture);
    }
}           
@Async("async-executor")
public int readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return sum;
    }catch (Exception e){
        return -1;
    }
}           

(2) Get the return value through completableFuture.get().

public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){
    logger.info("通过completableFuture.get()获取每个异步线程的插入结果----开始");

    int sum = 0;
    for (int i = 0; i < list.size(); i++) {
        Integer result = list.get(i).get();
        sum += result;
    }

    boolean insertFlag = excelRow == sum;
    logger.info("全部执行完毕,excelRow={},入库={}, 数据是否一致={}",excelRow,sum,insertFlag);
    return insertFlag;
}           

3. Efficiency comparison

(1) Test environment

  1. 12 logical processor computers;
  2. Excel contains 100,000 pieces of data;
  3. Future, with 24 core threads;
  4. The number of core threads in ForkJoinPool is 24;

(2) Statistics of 100,000 data storage time in four cases

  1. Asynchronous return values are not obtained
  2. Get the asynchronous return value through Future
  3. The asynchronous return value is obtained through CompletableFuture, and the default number of core threads in the ForkJoinPool thread pool is the number of local logical processors, and the test computer is 12.
  4. Use CompletableFuture to obtain the asynchronous return value and change the number of core threads in the ForkJoinPool thread pool to 24.

Note: Because CompletableFuture does not block the main thread, the execution time of the main thread is only 2 seconds, and the table counts the time when all the executions of the asynchronous thread are completed.

(3) Set the number of core threads

Is it the most efficient to set the number of core threads CorePoolSize to the number of processors of the CPU?

// 获取CPU的处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 测试电脑是24           

Because after the interface is called, open the asynchronous thread to execute the warehousing task, because the test machine can open up to 24 threads at the same time, so the 100,000 pieces of data are split into 24 equal parts, that is, 100,000/24 = 4166, so I set it to 4200, is it the most efficient?

During the test, it was found that this was really the case.

自定义ForkJoinPool线程池

@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;

@Override
public void readXls(String filePath, String filename) {
    List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                try {
                    return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);
                } catch (Exception e) {
                    logger.error("CompletableFuture----readXlsCacheAsync---异常:", e);
                    return -1;
                }
            };
        },asyncTaskExecutor);
    
        completableFutureList.add(completableFuture);
    }

    // 不会阻塞主线程
    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
        try {
            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
        } catch (Exception ex) {
            return;
        }
    });
}           

Custom thread pools

/**
 * 自定义异步线程池
 */
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //设置线程名称
    executor.setThreadNamePrefix("asyncTask-Executor");
    //设置最大线程数
    executor.setMaxPoolSize(200);
    //设置核心线程数
    executor.setCorePoolSize(24);
    //设置线程空闲时间,默认60
    executor.setKeepAliveSeconds(200);
    //设置队列容量
    executor.setQueueCapacity(50);
    /**
     * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
     * 通常有以下四种策略:
     * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
     * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
     * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
     * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
     */
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}           
When I get a double asynchronous return value, how do I ensure that the main thread does not block?

(4) Statistical analysis

Efficiency Comparison:

(3) Get the asynchronous return value through the CompletableFuture (12 threads) < (2) Get the asynchronous return value through the Future < (4) Get the asynchronous return value through the CompletableFuture (24 threads) < (1) Do not get the asynchronous return value

The performance is best when you don't get the asynchronous return value, which is not nonsense~

With the same number of core threads, the warehousing efficiency of CompletableFuture is better than that of Future, and 100,000 pieces of data are about 4 seconds faster, which is quite amazing, and this is where the value of optimization lies.

When I get a double asynchronous return value, how do I ensure that the main thread does not block?

4. Solve the problem of blocking the main thread by CompletableFuture.allOf

1. Grammar

CompletableFuture.allOf(CompletableFuture的可变数组).whenComplete((r,e) -> {})。

2. Code examples

getCompletableFutureResult method in "3.2.2 Get the return value via completableFuture.get()".

// 不会阻塞主线程
CompletableFuture.allOf(completableFutureList.toArray(new 		CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
    logger.info("全部执行完毕,解决主线程阻塞问题~");
    try {
        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
    } catch (Exception ex) {
        logger.error("全部执行完毕,解决主线程阻塞问题,异常:", ex);
        return;
    }
});

// 会阻塞主线程
//getCompletableFutureResult(completableFutureList, excelRow);

logger.info("CompletableFuture----会阻塞主线程吗?");           
When I get a double asynchronous return value, how do I ensure that the main thread does not block?

五、CompletableFuture中花俏的语法糖

1、runAsync

The runAsync method doesn't support return values.

Asynchronous methods with no return value can be executed via runAsync.

The main thread is not blocked.

// 分批异步读取Excel内容并入库
int finalEnd = end;
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();           

2、supplyAsync

supplyAsync can also process tasks asynchronously, and the incoming object implements the Supplier interface. Taking Supplier as a parameter and returning the CompletableFuture <T>result value means that it does not accept any input arguments and instead returns result as output.

Blocks the main thread.

key code of the supplyAsync() method:

int finalEnd = end;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
});           
@Override
public int readXlsCacheAsyncMybatis() {
    // 不为人知的操作
   	// 返回异步方法执行结果即可
	return 100;
}           

6. Execute asynchronous tasks sequentially

1、thenRun

thenRun() doesn't accept arguments and doesn't return a value, which works with runAsync() just right.

// JDK8的CompletableFuture
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis())
.thenRun(() -> logger.info("CompletableFuture----.thenRun()方法测试"));           
When I get a double asynchronous return value, how do I ensure that the main thread does not block?

2、thenAccept

thenAccept() accepts a parameter with no return value.

supplyAsync + thenAccept

  1. Asynchronous threads are executed sequentially
  2. supplyAsync, which can be used as a parameter to thenAccept
  3. The main thread is not blocked
CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenAccept(x -> logger.info(".thenAccept()方法测试:" + x));           
When I get a double asynchronous return value, how do I ensure that the main thread does not block?

However, it is no longer possible to get the return value of supplyAsync via completableFuture.get().

3、thenApply

On top of thenAccept, thenApply can be obtained again via completableFuture.get().

supplyAsync + thenApply,典型的链式编程。

  1. Methods are executed sequentially within an asynchronous thread
  2. supplyAsync, as the first thenApply parameter, for business processing
  3. The return value of the first thenApply is used as a parameter of the second thenApply for business processing
  4. Finally, the final return value is obtained through the future.get() method
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenApply((result) -> {
    return thenApplyTest2(result);// supplyAsync返回值 * 2
}).thenApply((result) -> {
    return thenApplyTest5(result);// thenApply返回值 * 5
});

logger.info("readXlsCacheAsyncMybatis插入数据 * 2 * 5 = " + completableFuture.get());           
When I get a double asynchronous return value, how do I ensure that the main thread does not block?

7. CompletableFuture merge task

  1. thenCombine, multiple asynchronous tasks are processed in parallel, there is a return value, and finally the merge result returns a new CompletableFuture object;
  2. thenAcceptBoth, multiple asynchronous tasks are processed in parallel, and there is no return value;
  3. acceptEither, multiple asynchronous tasks are processed in parallel, and there is no return value.
  4. applyToEither, multiple asynchronous tasks are processed in parallel with return values;

The code example of the CompletableFuture merge task, I won't go into detail here, it's just some syntactic sugar, and everyone should remember to fall into the cycle of low-level diligence.

八、CompletableFuture VS Future总结

In this article, the following aspects compare the differences between CompletableFuture and Future:

  1. The implementation principle of ForkJoinPool and ThreadPoolExecutor explores the differences between CompletableFuture and Future.
  2. Through the form of code examples, the fancy syntactic sugar in CompletableFuture is briefly introduced.
  3. Optimized "Get asynchronous return value via Future" via CompletableFuture;
  4. Solve the problem of blocking the main thread with CompletableFuture.allOf.

Future provides the ability to execute asynchronously, but Future.get() will get the asynchronous return value by polling, and the get() method will also block the main thread.

The polling method is very CPU intensive, and the blocking method is clearly the opposite of our original asynchronous intention.

The CompletableFuture provided by JDK8 implements the Future interface and adds many features that Futures do not have, such as chain programming, exception handling callback functions, obtaining asynchronous results without blocking and polling, merging asynchronous tasks, etc.

After obtaining the results of the asynchronous thread, we can add transactions to achieve data consistency in Excel inbound operations.

How do I implement a transaction in an asynchronous multithreading situation?

Some friends may say:

Add @Transactional annotations, if an exception occurs or the amount of data in the database does not match, you can directly roll it back~

So, is that really the case? We'll see you next time~