前言
我們經常使用spring quartz 處理各種業務,那麼如何使用多線程批量處理,而且保證不會有重複資料。想必處理過這類業務的朋友,一定了解Executors或者ThreadPoolTaskExecutor這類工具。這裡不對這些東西做任何解說。因為時間關系,我按自己的方式實作,直接提供代碼供大家參考。
需要返值的
public class GameConsumerJob {
private static final Logger logger = LoggerFactory.getLogger(GameConsumerJob.class);
@Autowired
private Service service;
@Autowired
private CxylThreadPool threadPool;
static int threadCounts = 10;//使用的線程數
static long sum = 0;
public void execute() {
logger.info("隊列消費者線程運作,GameConsumerJob{}", Thread.currentThread().getName());
try {
pushQueueWorkerRun();
} catch (Exception e) {
logger.info("隊列消費者線程運作,GameConsumerJob{}發生異常:", e);
}
}
private void pushQueueWorkerRun() throws Exception {
List<CxylConsumerData> dataList = service.getLastConsumers(2000);
if (dataList != null && dataList.size() > 0) {
ExecutorService ex = Executors.newFixedThreadPool(threadCounts);
List<Callable<Long>> callList = new ArrayList<Callable<Long>>();
int dataCount = dataList.size();
int len = dataCount/threadCounts;//分批數
if(len == 0){
threadCounts = dataCount;//采用一個線程處理List中的一個元素
len = 1;//重新平均分割List
}
for(int i=0; i<threadCounts; i++){
final List<CxylConsumerData> subList;
if(i == threadCounts-1){
subList = dataList.subList(i*len, dataCount);
}else{
subList = dataList.subList(i*len, len*(i+1)>dataList.size()?dataList.size():len*(i+1));
}
//采用匿名内部類實作
callList.add(new Callable<Long>(){
public Long call() throws Exception {
long subSum = 0L;
for(CxylConsumerData date : subList){
int value = cxylZdService.playerConsume(date);
subSum += value;
}
System.out.println("配置設定給線程:"+Thread.currentThread().getName()+"那一部分List的整數和為:\tSubSum:"+subSum);
return subSum;
}
});
}
List<Future<Long>> futureList = ex.invokeAll(callList);
for(Future<Long> future: futureList){
sum += future.get();
}
ex.shutdown();
}
}
}
使用Callable存在性能消耗,尤其大量子線程建立,以下是無返值優化
private void pushQueueWorkerRun2() throws Exception {
List<CxylConsumerData> dataList = service.getLastConsumers(2000);
if (dataList != null && dataList.size() > 0) {
ExecutorService exe = Executors.newFixedThreadPool(threadCounts);
int dataCount = dataList.size();
int len = dataCount/threadCounts;//分批數
if(len == 0){
threadCounts = dataCount;//采用一個線程處理List中的一個元素
len = 1;//重新平均分割List
}
for(int i=0; i<threadCounts; i++){
final List<CxylConsumerData> subList;
if(i == threadCounts-1){
subList = dataList.subList(i*len, dataCount);
}else{
subList = dataList.subList(i*len, len*(i+1)>dataList.size()?dataList.size():len*(i+1));
}
Runnable run = new Runnable() {
@Override
public void run() {
for(CxylConsumerData date : subList){
cxylZdService.playerConsume(date);
}
}
};
exe.execute(run);
}
exe.shutdown();
while (true) {
if (exe.isTerminated()) {
logger.info("GameConsumerJob is end {}", Thread.currentThread().getName());
break;
}
Thread.sleep(200);
}
exe.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
}
}
以上代理僅供大家參考,實作的方式很多,根據自己的業務情況實作即可。