天天看點

SpringBoot多線程,使用Guava的ListenableFuture替代Java原生Future,多個線程同時異步回調!

文章借鑒自:https://cloud.tencent.com/developer/article/1386118

由于不想粘自己代碼,是以偷個懶,感覺原作者的代碼沒有注釋,看起來很難受,是以将自己了解的重新發一下。

如有侵權請告知。

前提:

日常開發,有很多場景會使用到多線程,比如,我們解析Excel,如果解析出一個3萬條資料的Excel表格,需要兩部:

1.我們需要先異步解析出所有的資料,前面寫過了如何異步處理任務(Spring Boot---(4)SpringBoot異步處理任務);

2.然後再多線程去處理業務或者插入到資料庫;

這裡,講解一下,如何使用多線程,如何調用回調函數。

1.引入pom.xml的依賴

由于後面要用到Guava包下的東西,是以這裡先引入一下。

<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>22.0</version>
</dependency>
           

最新好像已經更新到27.1了,沒有去留意更新了哪些東東,有心的同學可以百度搜一下,都更新了些什麼。

2.controller

這裡模拟10萬條資料,然後多線程處理。

package com.jd.concurrent;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * author:lightClouds917
 * date:2018/1/22
 * description:模拟多線程處理
 */
@RestController
@RequestMapping("con")
public class ConController {
    private final static Logger logger = LoggerFactory.getLogger(Logger.class);
    //    線程池
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 500,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(200),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    //使用Guava的ListeningExecutorService裝飾線程池
    ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);

    @RequestMapping(value = "test1", method = RequestMethod.GET)
    public String test1() {
        try {
            //10萬條資料
            List<String> list = new ArrayList<>();
            List<String> list2 = new ArrayList<>();

            for (int i = 1; i <= 100000; i++) {
                list.add("test:" + i);
            }

            //每條線程處理的資料尺寸
            int size = 250;
            int count = list.size() / size;  //根據size得出線程數量
            if (count * size != list.size()) {
                count++; //如果已有線程要處理資料的總數,不等于list資料集合的總數,線程+1
            }
            int countNum = 0;//統計處理資料
            final CountDownLatch countDownLatch = new CountDownLatch(count);//線程計數器

            while (countNum < list.size()) {
                countNum += size;
                //建立一個對象,此對象繼承Callable,下面會有源代碼
                ConCallable callable = new ConCallable();
                //截取list的資料,分給不同線程處理
                /*這段代碼寫的很好,我喜歡這段,根據集合的下标,形成多線程,每個線程處理固定的數量,當最後一個線程要處理的資料大于總數的時候,
                則從上一個線程處理的末尾,到資料總數。真正意義上的多線程,本來多線程這塊兒我是寫死的,手動配置設定幾個線程,代碼效率低;
                這段兒代碼,根據size可以随時對線程調優,僅需修改size,即可找到适合自己業務的線程數。*/
                callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
                //執行線程
                ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
                //異步回調操作,原作者僅僅是展示下如何使用回調,
                /*如果有這種需求:多個線程執行,
                 都執行完畢,進行回調,則需要調用Futures.allAsList(futureList),多線程同時回調的代碼會在文章末尾 單獨貼出來。*/
                Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
                    @Override
                    public void onSuccess(List<String> list1) {
                        countDownLatch.countDown();//計數器-1
                        list2.addAll(list1);//将線程執行結果放入結果集
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        countDownLatch.countDown();
                        logger.info("處理出錯:", throwable);

                    }
                });
            }
            //主線程阻塞,我直接這麼用的countDownLatch.await(); 
            // 原作者這個應該是個逾時政策,超過這個時間的線程,直接舍棄。
            countDownLatch.await(30, TimeUnit.MINUTES);
            logger.info("符合條件的傳回資料個數為:" + list2.size());
            logger.info("回調函數:" + list2.toString());
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return "正在處理......";

    }
}
           

3.線程任務處理類

package com.jd.concurrent;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.List;

/**
 * author:lightClouds917
 * date:2018/1/22
 * description:業務處理
 */
public class ConCallable implements Callable {
    private List<String> list;

    @Override
    public Object call() throws Exception {
        List<String> listRe = new ArrayList<>();
        for(int i = 0;i < list.size();i++){
            //含有‘4599’的字元串都傳回
            if(list.get(i).contains("4599")){
                listRe.add(list.get(i));
            }
        }
        return listRe;
    }

    public void setList(List<String> list) {
        this.list = list;
    }
}
           

4.傳回結果:

符合條件的傳回資料個數為:20
           
回調函數:[test:4599, test:14599, test:24599, test:34599, test:44599, test:45990, test:45991, test:45992, test:45993, test:45994, test:45995, test:45996, test:45997, test:45998, test:45999, test:54599, test:64599, test:74599, test:84599, test:94599]
           

原作者的文章到此結束,但是沒有達到我的目的呀,我希望的是多線程執行完畢,進行統一回調,為了達到這個目的,對代碼進行了如下改造:需要調用Futures.allAsList()

//在循環外建立一個list數組,用于存放線程
        List futureList = new ArrayList<ListenableFuture>();

        while (countNum < list.size()) {
            countNum += size;
            //建立一個對象,此對象實作Callable接口
            ConCallable callable = new ConCallable();
            //截取list的資料,分給不同線程處理
            callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
            //執行線程
            ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
            //将子線程添加至線程集合
            futureList.add(listenableFuture);
        }
        /*都執行完畢,進行回調,則需要調用Futures.allAsList(futureList),多線程同時回調.
         * 由于是所有的線程一起回調,線程的傳回結果自動存放在一個list中,
         * 是以需要将上面的List<String> 改為:List<List<String>>*/
         Futures.addCallback(Futures.allAsList(futureList), new FutureCallback<List<List<String>>>() {
            @Override
            public void onSuccess(List<List<String>> list1) {
                for (List<String> list : list1) {
                    countDownLatch.countDown();//計數器-1
                    list2.addAll(list);//将線程執行結果放入結果集
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                countDownLatch.countDown();
                logger.info("處理出錯:", throwable);

            }
        });

        //主線程阻塞
        countDownLatch.await();
        System.out.println("OK!");
           

大功告成!繼續碼代碼!

繼續閱讀