前言
當遇到大量資料導入時,為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。常見的場景有:
- 大檔案導入資料庫(這個檔案不一定是标準的
可導入檔案或者需要在記憶體中經過一定的處理)CSV
- 資料同步(從第三方接口拉取資料處理後寫入自己的資料庫)
以上的場景有一個共性,這類資料導入的場景簡單來說就是将資料從一個資料源移動到另外一個資料源,而其中必定可以分為兩步
- 資料讀取:從資料源讀取資料到記憶體
- 資料寫入:将記憶體中的資料寫入到另外一個資料源,可能存在資料處理
而且資料讀取的速度一般會比資料寫入的速度快很多,即讀取快,寫入慢。
設計思路
由于場景的特點是讀取快,寫入慢,如果是使用多線程處理,建議是資料寫入部分改造為多線程。而資料讀取可以改造成批量讀取資料。簡單來說就是兩個要點:
- 批量讀取資料
- 多線程寫入資料
示例
多線程批量處理最簡單的方案是使用線程池來進行處理,下面會通過一個模拟批量讀取和寫入的服務,以及對這個服務的多線程寫入調用作為示例,展示如何多線程批量資料導入。
模拟服務
import java.util.concurrent.atomic.AtomicLong;
/**
* 資料批量寫入用的模拟服務
*
* @author RJH
* create at 2019-04-01
*/
public class MockService {
/**
* 可讀取總數
*/
private long canReadTotal;
/**
* 寫入總數
*/
private AtomicLong writeTotal=new AtomicLong(0);
/**
* 寫入休眠時間(機關:毫秒)
*/
private final long sleepTime;
/**
* 構造方法
*
* @param canReadTotal
* @param sleepTime
*/
public MockService(long canReadTotal, long sleepTime) {
this.canReadTotal = canReadTotal;
this.sleepTime = sleepTime;
}
/**
* 批量讀取資料接口
*
* @param num
* @return
*/
public synchronized long readData(int num) {
long readNum;
if (canReadTotal >= num) {
canReadTotal -= num;
readNum = num;
} else {
readNum = canReadTotal;
canReadTotal = 0;
}
//System.out.println("read data size:" + readNum);
return readNum;
}
/**
* 寫入資料接口
*/
public void writeData() {
try {
// 休眠一定時間模拟寫入速度慢
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 寫入總數自增
System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
}
/**
* 擷取寫入的總數
*
* @return
*/
public long getWriteTotal() {
return writeTotal.get();
}
}
批量資料處理器
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于線程池的多線程批量寫入處理器
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandler {
private ExecutorService executorService;
private MockService service;
/**
* 每次批量讀取的資料量
*/
private int batch;
/**
* 線程個數
*/
private int threadNum;
public SimpleBatchHandler(MockService service, int batch,int threadNum) {
this.service = service;
this.batch = batch;
//使用固定數目的線程池
this.executorService = Executors.newFixedThreadPool(threadNum);
}
/**
* 開始處理
*/
public void startHandle() {
// 開始處理的時間
long startTime = System.currentTimeMillis();
System.out.println("start handle time:" + startTime);
long readData;
while ((readData = service.readData(batch)) != 0) {// 批量讀取資料,知道讀取不到資料才停止
for (long i = 0; i < readData; i++) {
executorService.execute(() -> service.writeData());
}
}
// 關閉線程池
executorService.shutdown();
while (!executorService.isTerminated()) {//等待線程池中的線程執行完
}
// 結束時間
long endTime = System.currentTimeMillis();
System.out.println("end handle time:" + endTime);
// 總耗時
System.out.println("total handle time:" + (endTime - startTime) + "ms");
// 寫入總數
System.out.println("total write num:" + service.getWriteTotal());
}
}
測試類
/**
* SimpleBatchHandler的測試類
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandlerTest {
public static void main(String[] args) {
// 總數
long total=100000;
// 休眠時間
long sleepTime=100;
// 每次拉取的數量
int batch=100;
// 線程個數
int threadNum=16;
MockService mockService=new MockService(total,sleepTime);
SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}
運作結果
start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分輸出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000
分析
在單線程情況下的執行時間應該為
total*sleepTime
,即
10000000ms
,而改造為多線程後執行時間為
648447ms
。
示例問題
本示例存在一些問題,會在後續的部落格中對本示例進行優化,同時分享給大家如何解決這些問題。