文章借鑒自:https://cloud.tencent.com/developer/article/1386118
由于不想粘自己代碼,是以偷個懶,感覺原作者的代碼沒有注釋,看起來很難受,是以将自己了解的重新發一下。
如有侵權請告知。
前提:
日常開發,有很多場景會使用到多線程,比如,我們解析Excel,如果解析出一個3萬條資料的Excel表格,需要兩部:
1.我們需要先異步解析出所有的資料,前面寫過了如何異步處理任務(Spring Boot---(4)SpringBoot異步處理任務);
2.然後再多線程去處理業務或者插入到資料庫;
這裡,講解一下,如何使用多線程,如何調用回調函數。
1.引入pom.xml的依賴
由于後面要用到Guava包下的東西,是以這裡先引入一下。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
最新好像已經更新到27.1了,沒有去留意更新了哪些東東,有心的同學可以百度搜一下,都更新了些什麼。
2.controller
這裡模拟10萬條資料,然後多線程處理。
package com.jd.concurrent;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* author:lightClouds917
* date:2018/1/22
* description:模拟多線程處理
*/
@RestController
@RequestMapping("con")
public class ConController {
private final static Logger logger = LoggerFactory.getLogger(Logger.class);
// 線程池
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 500,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//使用Guava的ListeningExecutorService裝飾線程池
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
@RequestMapping(value = "test1", method = RequestMethod.GET)
public String test1() {
try {
//10萬條資料
List<String> list = new ArrayList<>();
List<String> list2 = new ArrayList<>();
for (int i = 1; i <= 100000; i++) {
list.add("test:" + i);
}
//每條線程處理的資料尺寸
int size = 250;
int count = list.size() / size; //根據size得出線程數量
if (count * size != list.size()) {
count++; //如果已有線程要處理資料的總數,不等于list資料集合的總數,線程+1
}
int countNum = 0;//統計處理資料
final CountDownLatch countDownLatch = new CountDownLatch(count);//線程計數器
while (countNum < list.size()) {
countNum += size;
//建立一個對象,此對象繼承Callable,下面會有源代碼
ConCallable callable = new ConCallable();
//截取list的資料,分給不同線程處理
/*這段代碼寫的很好,我喜歡這段,根據集合的下标,形成多線程,每個線程處理固定的數量,當最後一個線程要處理的資料大于總數的時候,
則從上一個線程處理的末尾,到資料總數。真正意義上的多線程,本來多線程這塊兒我是寫死的,手動配置設定幾個線程,代碼效率低;
這段兒代碼,根據size可以随時對線程調優,僅需修改size,即可找到适合自己業務的線程數。*/
callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
//執行線程
ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
//異步回調操作,原作者僅僅是展示下如何使用回調,
/*如果有這種需求:多個線程執行,
都執行完畢,進行回調,則需要調用Futures.allAsList(futureList),多線程同時回調的代碼會在文章末尾 單獨貼出來。*/
Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> list1) {
countDownLatch.countDown();//計數器-1
list2.addAll(list1);//将線程執行結果放入結果集
}
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
logger.info("處理出錯:", throwable);
}
});
}
//主線程阻塞,我直接這麼用的countDownLatch.await();
// 原作者這個應該是個逾時政策,超過這個時間的線程,直接舍棄。
countDownLatch.await(30, TimeUnit.MINUTES);
logger.info("符合條件的傳回資料個數為:" + list2.size());
logger.info("回調函數:" + list2.toString());
} catch (Exception ex) {
ex.printStackTrace();
}
return "正在處理......";
}
}
3.線程任務處理類
package com.jd.concurrent;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.List;
/**
* author:lightClouds917
* date:2018/1/22
* description:業務處理
*/
public class ConCallable implements Callable {
private List<String> list;
@Override
public Object call() throws Exception {
List<String> listRe = new ArrayList<>();
for(int i = 0;i < list.size();i++){
//含有‘4599’的字元串都傳回
if(list.get(i).contains("4599")){
listRe.add(list.get(i));
}
}
return listRe;
}
public void setList(List<String> list) {
this.list = list;
}
}
4.傳回結果:
符合條件的傳回資料個數為:20
回調函數:[test:4599, test:14599, test:24599, test:34599, test:44599, test:45990, test:45991, test:45992, test:45993, test:45994, test:45995, test:45996, test:45997, test:45998, test:45999, test:54599, test:64599, test:74599, test:84599, test:94599]
原作者的文章到此結束,但是沒有達到我的目的呀,我希望的是多線程執行完畢,進行統一回調,為了達到這個目的,對代碼進行了如下改造:需要調用Futures.allAsList()
//在循環外建立一個list數組,用于存放線程
List futureList = new ArrayList<ListenableFuture>();
while (countNum < list.size()) {
countNum += size;
//建立一個對象,此對象實作Callable接口
ConCallable callable = new ConCallable();
//截取list的資料,分給不同線程處理
callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
//執行線程
ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
//将子線程添加至線程集合
futureList.add(listenableFuture);
}
/*都執行完畢,進行回調,則需要調用Futures.allAsList(futureList),多線程同時回調.
* 由于是所有的線程一起回調,線程的傳回結果自動存放在一個list中,
* 是以需要将上面的List<String> 改為:List<List<String>>*/
Futures.addCallback(Futures.allAsList(futureList), new FutureCallback<List<List<String>>>() {
@Override
public void onSuccess(List<List<String>> list1) {
for (List<String> list : list1) {
countDownLatch.countDown();//計數器-1
list2.addAll(list);//将線程執行結果放入結果集
}
}
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
logger.info("處理出錯:", throwable);
}
});
//主線程阻塞
countDownLatch.await();
System.out.println("OK!");
大功告成!繼續碼代碼!