分享知識 傳遞快樂
使用多線程的方式批量處理資料操作。示例如下:
@Test
public void syncAccount() throws Exception {
// 每500條資料開啟一條線程
int threadSize = 50;
// 總資料條數
int dataSize = readAll.size();
// 方法一:每個線程處理資料量
int threadNum = dataSize / threadSize + 1;
if (dataSize % threadSize == 0) {
threadNum = threadNum - 1;
}
// 方法一:每個線程處理資料量
// int threadNum = (threadSize - 1) / threadSize + 1;
// 方法三:開啟5個線程,把資料平分到每個線程中
// int threadSize = (dataSize - 1) / 5 + 1;
// 初始化線程池
ExecutorService executorService = new ThreadPoolExecutor(threadNum,// 核心線程池大小
100,// 線程池最大容量大小
0L,// 線程池空閑時,線程存活的時間
TimeUnit.MILLISECONDS,// 時間機關
new LinkedBlockingQueue<Runnable>()// 任務隊列
);
List<Callable<List<Map<String, Object>>>> tasks = this.multiThread(threadSize, threadNum, dataSize, readAll);
List<Future<List<Map<String, Object>>>> results = executorService.invokeAll(tasks);
for (Future<List<Map<String, Object>>> future : results) {
//每個線程傳回的資料處理,有順序對應關系
// System.out.println(future.get());
writeList.addAll(future.get());
}
// 關閉線程池
executorService.shutdown();
System.out.println("線程任務執行結束");
}
private List<Callable<List<Map<String, Object>>>> multiThread(int threadSize, int threadNum, int dataSize, List<Map<String, Object>> excelList) {
// 定義一個任務集合
List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<Callable<List<Map<String, Object>>>>();
Callable<List<Map<String, Object>>> task = null;
List<Map<String, Object>> cutExcelList = null;
for (int i = 0; i < threadNum; i++) {
// 确定每條線程的資料
if (i == threadNum - 1) {
cutExcelList = excelList.subList(threadSize * i, dataSize);
} else {
cutExcelList = excelList.subList(threadSize * i, threadSize * (i + 1));
}
// System.out.println("第" + (i + 1) + "組:" + cutExcelList.toString());
final List<Map<String, Object>> cutList = cutExcelList;
task = new Callable<List<Map<String, Object>>>() {
@Override
public List<Map<String, Object>> call() throws Exception {
// 線程處理邏輯
// System.out.println(Thread.currentThread().getName() + "線程:" + cutList);
// return Thread.currentThread().getName();
return sendSyncCenter(Thread.currentThread().getName(), cutList);
}
};
// 這裡送出的任務容器清單和傳回的Future清單存在順序對應的關系
tasks.add(task);
}
return tasks;
}
private List<Map<String, Object>> sendSyncCenter(String threadName, List<Map<String, Object>> excelList) {
List<Map<String, Object>> writeList = new ArrayList<>();
for (Map<String, Object> row : excelList) {
row.put("threadName", threadName);
writeList.add(row);
}
return writeList;
}