天天看點

使用多線程的方式對資料操作

分享知識 傳遞快樂

使用多線程的方式批量處理資料操作。示例如下:

@Test
public void syncAccount() throws Exception {
    // 每500條資料開啟一條線程
    int threadSize = 50;
    // 總資料條數
    int dataSize = readAll.size();
    // 方法一:每個線程處理資料量
    int threadNum = dataSize / threadSize + 1;
    if (dataSize % threadSize == 0) {
        threadNum = threadNum - 1;
    }
    // 方法一:每個線程處理資料量
    // int threadNum = (threadSize - 1) / threadSize + 1;

    // 方法三:開啟5個線程,把資料平分到每個線程中
    // int threadSize = (dataSize  - 1) / 5 + 1;

    // 初始化線程池
    ExecutorService executorService = new ThreadPoolExecutor(threadNum,// 核心線程池大小
            100,// 線程池最大容量大小
            0L,// 線程池空閑時,線程存活的時間
            TimeUnit.MILLISECONDS,// 時間機關
            new LinkedBlockingQueue<Runnable>()// 任務隊列
    );

    List<Callable<List<Map<String, Object>>>> tasks = this.multiThread(threadSize, threadNum, dataSize, readAll);

    List<Future<List<Map<String, Object>>>> results = executorService.invokeAll(tasks);

    for (Future<List<Map<String, Object>>> future : results) {
        //每個線程傳回的資料處理,有順序對應關系
//            System.out.println(future.get());
        writeList.addAll(future.get());
    }

    // 關閉線程池
    executorService.shutdown();
    System.out.println("線程任務執行結束");
}


private List<Callable<List<Map<String, Object>>>> multiThread(int threadSize, int threadNum, int dataSize, List<Map<String, Object>> excelList) {
    // 定義一個任務集合
    List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<Callable<List<Map<String, Object>>>>();
    Callable<List<Map<String, Object>>> task = null;
    List<Map<String, Object>> cutExcelList = null;

    for (int i = 0; i < threadNum; i++) {
        // 确定每條線程的資料
        if (i == threadNum - 1) {
            cutExcelList = excelList.subList(threadSize * i, dataSize);
        } else {
            cutExcelList = excelList.subList(threadSize * i, threadSize * (i + 1));
        }
//            System.out.println("第" + (i + 1) + "組:" + cutExcelList.toString());
        final List<Map<String, Object>> cutList = cutExcelList;
        task = new Callable<List<Map<String, Object>>>() {

            @Override
            public List<Map<String, Object>> call() throws Exception {
                // 線程處理邏輯
//                    System.out.println(Thread.currentThread().getName() + "線程:" + cutList);
//                    return Thread.currentThread().getName();
                return sendSyncCenter(Thread.currentThread().getName(), cutList);
            }
        };
        // 這裡送出的任務容器清單和傳回的Future清單存在順序對應的關系
        tasks.add(task);
    }

    return tasks;
}

private List<Map<String, Object>> sendSyncCenter(String threadName, List<Map<String, Object>> excelList) {
    List<Map<String, Object>> writeList = new ArrayList<>();
    for (Map<String, Object> row : excelList) {

        row.put("threadName", threadName);

        writeList.add(row);
    }
    return writeList;
}      

繼續閱讀