天天看点

SpringBoot多线程,使用Guava的ListenableFuture替代Java原生Future,多个线程同时异步回调!

文章借鉴自:https://cloud.tencent.com/developer/article/1386118

由于不想粘自己代码,所以偷个懒,感觉原作者的代码没有注释,看起来很难受,所以将自己理解的重新发一下。

如有侵权请告知。

前提:

日常开发,有很多场景会使用到多线程,比如,我们解析Excel,如果解析出一个3万条数据的Excel表格,需要两部:

1.我们需要先异步解析出所有的数据,前面写过了如何异步处理任务(Spring Boot---(4)SpringBoot异步处理任务);

2.然后再多线程去处理业务或者插入到数据库;

这里,讲解一下,如何使用多线程,如何调用回调函数。

1.引入pom.xml的依赖

由于后面要用到Guava包下的东西,所以这里先引入一下。

<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>22.0</version>
</dependency>
           

最新好像已经更新到27.1了,没有去留意更新了哪些东东,有心的同学可以百度搜一下,都更新了些什么。

2.controller

这里模拟10万条数据,然后多线程处理。

package com.jd.concurrent;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * author:lightClouds917
 * date:2018/1/22
 * description:模拟多线程处理
 */
@RestController
@RequestMapping("con")
public class ConController {
    private final static Logger logger = LoggerFactory.getLogger(Logger.class);
    //    线程池
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 500,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(200),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    //使用Guava的ListeningExecutorService装饰线程池
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);

    @RequestMapping(value = "test1", method = RequestMethod.GET)
    public String test1() {
        try {
            //10万条数据
            List<String> list = new ArrayList<>();
            List<String> list2 = new ArrayList<>();

            for (int i = 1; i <= 100000; i++) {
                list.add("test:" + i);
            }

            //每条线程处理的数据尺寸
            int size = 250;
            int count = list.size() / size;  //根据size得出线程数量
            if (count * size != list.size()) {
                count++; //如果已有线程要处理数据的总数,不等于list数据集合的总数,线程+1
            }
            int countNum = 0;//统计处理数据
            final CountDownLatch countDownLatch = new CountDownLatch(count);//线程计数器

            while (countNum < list.size()) {
                countNum += size;
                //创建一个对象,此对象继承Callable,下面会有源代码
                ConCallable callable = new ConCallable();
                //截取list的数据,分给不同线程处理
                /*这段代码写的很好,我喜欢这段,根据集合的下标,形成多线程,每个线程处理固定的数量,当最后一个线程要处理的数据大于总数的时候,
                则从上一个线程处理的末尾,到数据总数。真正意义上的多线程,本来多线程这块儿我是写死的,手动分配几个线程,代码效率低;
                这段儿代码,根据size可以随时对线程调优,仅需修改size,即可找到适合自己业务的线程数。*/
                callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
                //执行线程
                ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
                //异步回调操作,原作者仅仅是展示下如何使用回调,
                /*如果有这种需求:多个线程执行,
                 都执行完毕,进行回调,则需要调用Futures.allAsList(futureList),多线程同时回调的代码会在文章末尾 单独贴出来。*/
                Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
                    @Override
                    public void onSuccess(List<String> list1) {
                        countDownLatch.countDown();//计数器-1
                        list2.addAll(list1);//将线程执行结果放入结果集
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        countDownLatch.countDown();
                        logger.info("处理出错:", throwable);

                    }
                });
            }
            //主线程阻塞,我直接这么用的countDownLatch.await(); 
            // 原作者这个应该是个超时策略,超过这个时间的线程,直接舍弃。
            countDownLatch.await(30, TimeUnit.MINUTES);
            logger.info("符合条件的返回数据个数为:" + list2.size());
            logger.info("回调函数:" + list2.toString());
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return "正在处理......";

    }
}
           

3.线程任务处理类

package com.jd.concurrent;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.List;

/**
 * author:lightClouds917
 * date:2018/1/22
 * description:业务处理
 */
public class ConCallable implements Callable {
    private List<String> list;

    @Override
    public Object call() throws Exception {
        List<String> listRe = new ArrayList<>();
        for(int i = 0;i < list.size();i++){
            //含有‘4599’的字符串都返回
            if(list.get(i).contains("4599")){
                listRe.add(list.get(i));
            }
        }
        return listRe;
    }

    public void setList(List<String> list) {
        this.list = list;
    }
}
           

4.返回结果:

符合条件的返回数据个数为:20
           
回调函数:[test:4599, test:14599, test:24599, test:34599, test:44599, test:45990, test:45991, test:45992, test:45993, test:45994, test:45995, test:45996, test:45997, test:45998, test:45999, test:54599, test:64599, test:74599, test:84599, test:94599]
           

原作者的文章到此结束,但是没有达到我的目的呀,我希望的是多线程执行完毕,进行统一回调,为了达到这个目的,对代码进行了如下改造:需要调用Futures.allAsList()

//在循环外创建一个list数组,用于存放线程
        List futureList = new ArrayList<ListenableFuture>();

        while (countNum < list.size()) {
            countNum += size;
            //创建一个对象,此对象实现Callable接口
            ConCallable callable = new ConCallable();
            //截取list的数据,分给不同线程处理
            callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
            //执行线程
            ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
            //将子线程添加至线程集合
            futureList.add(listenableFuture);
        }
        /*都执行完毕,进行回调,则需要调用Futures.allAsList(futureList),多线程同时回调.
         * 由于是所有的线程一起回调,线程的返回结果自动存放在一个list中,
         * 因此需要将上面的List<String> 改为:List<List<String>>*/
         Futures.addCallback(Futures.allAsList(futureList), new FutureCallback<List<List<String>>>() {
            @Override
            public void onSuccess(List<List<String>> list1) {
                for (List<String> list : list1) {
                    countDownLatch.countDown();//计数器-1
                    list2.addAll(list);//将线程执行结果放入结果集
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                countDownLatch.countDown();
                logger.info("处理出错:", throwable);

            }
        });

        //主线程阻塞
        countDownLatch.await();
        System.out.println("OK!");
           

大功告成!继续码代码!

继续阅读