1.課程導讀
本文示範的重點是阻塞隊列BLockingQueue,實作将同步請求轉為異步處理;
BLockingQueue是一個阻塞的隊列,最典型的應用場景就是生産者和消費者模式。生産者和消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此并不直接通信,而是通過阻塞隊列進行通信,是以生産者生産完資料後不用等待消費者進行處理,而是直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列中擷取資料,阻塞隊列就相當于一個緩沖區,平衡生産者和消費者的處理能力。
在Java中該隊列隻是一個接口,它的實作類有
ArrayBlockingQueue、
DelayQueue、
LinkedBlockingDeque、
LinkedBlockingQueue、
PriorityBlockingQueue、
SynchronousQueue等。
本文使用的是 LinkedBlockingQueue,廢話不多說,上代碼:
2. 示例代碼
2.1 依賴配置
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
2.2 控制層
@RequestMapping(value = "/v1/insertData")
public ResponseData insertData(@RequestBody JSONObject jsonObject) {
BatchData batchData = new BatchData();
batchData.setDataId(IdWorker.get32UUID());
batchData.setTableNum(jsonObject.getString("tableNum"));
batchData.setMessage(jsonObject.getString("message"));
batchData.setCreateTime(LocalDateTime.now());
//方式1.同步 入庫
//batchDataService.save(batchData);
//方式2. 異步隊列
AsyncVo<BatchData> asyncVo = new AsyncVo<>();
asyncVo.setParams(batchData);
produceQueue.putQueue(asyncVo);
return ResponseData.success();
}
2.3 阻塞隊列 (包名:package com.huigu.gpt.queue)
生産者:
@Component
public class ProduceQueue {
/**
* 阻塞隊列,設定緩沖容量為Integer.MAX_VALUE 不做容量限制
*/
private static final BlockingQueue<AsyncVo<?>> queue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
public BlockingQueue<AsyncVo<?>> getQueue() {
return queue;
}
/**
* 資料隊列
* @param vo
*/
public void putQueue(AsyncVo<?> vo){
try {
queue.put(vo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消費者:
@Component
@Slf4j
public class ConsumQuene extends Thread{
@Autowired
private ProduceQueue produceQueue;
@Autowired
private IBatchDataService batchDataService;
private static boolean shutDown = true;
public void setShutDown() {
shutDown = false;
}
@Override
public void run() {
while (shutDown) {
try {
int queueSize = produceQueue.getQueue().size();
if (queueSize != 0){
log.info("隊列長度-> {}" , queueSize);
}
List<AsyncVo> asyncVoList = new ArrayList<>();
//1.500條資料入庫;2.等待100 毫秒,沒達到500條進行入庫 3. 沒有資料會阻塞目前線程
Queues.drain(produceQueue.getQueue(), asyncVoList, 500, 100, TimeUnit.MILLISECONDS);
if (asyncVoList.size() == 0){
continue;
}
List<Object> objectList = asyncVoList.stream().map(AsyncVo::getParams).collect(Collectors.toList());
if (objectList.size() > 0){
List<BatchData> batchDataList = (List)objectList;
batchDataService.saveBatch(batchDataList);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
隊列監聽
@Component
public class QueueListener {
@Autowired
private ConsumQuene consumQuene;
/**
* 初始化時啟動監聽請求隊列
*/
@PostConstruct
public void init() {
consumQuene.start();
}
/**
* 銷毀容器時停止監聽任務
*/
@PreDestroy
public void destory() {
consumQuene.setShutDown();
}
}
3. 壓力測試(Jmeter)
每秒啟動50個使用者線程,持續輸出十分鐘
3.1 同步模式下使用JMeter壓測,結果如下:
該模式下,TPS約為: 437/sec
控制台輸出如下:
3.2 阻塞隊列模式下壓測結果如下:
該模式下TPS約為: 454/sec 明顯高于同步模式下的TPS,并且随着持續加壓,差別會越來越明顯,這就是相同硬體配置下通過優化代碼增加系統吞吐量的關鍵所在。
控制台輸出如下:
Mysql資料庫存儲情況