天天看點

【Java高并發調優系列代碼實戰】阻塞隊列

作者:jason說程式設計

1.課程導讀

本文示範的重點是阻塞隊列BLockingQueue,實作将同步請求轉為異步處理;

BLockingQueue是一個阻塞的隊列,最典型的應用場景就是生産者和消費者模式。生産者和消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此并不直接通信,而是通過阻塞隊列進行通信,是以生産者生産完資料後不用等待消費者進行處理,而是直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列中擷取資料,阻塞隊列就相當于一個緩沖區,平衡生産者和消費者的處理能力。

【Java高并發調優系列代碼實戰】阻塞隊列

在Java中該隊列隻是一個接口,它的實作類有

ArrayBlockingQueue、

DelayQueue、

LinkedBlockingDeque、

LinkedBlockingQueue、

PriorityBlockingQueue、

SynchronousQueue等。

本文使用的是 LinkedBlockingQueue,廢話不多說,上代碼:

2. 示例代碼

2.1 依賴配置

<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>20.0</version>
</dependency>           

2.2 控制層

@RequestMapping(value = "/v1/insertData")
public ResponseData insertData(@RequestBody JSONObject jsonObject) {
    BatchData batchData = new BatchData();
    batchData.setDataId(IdWorker.get32UUID());
    batchData.setTableNum(jsonObject.getString("tableNum"));
    batchData.setMessage(jsonObject.getString("message"));
    batchData.setCreateTime(LocalDateTime.now());
    //方式1.同步 入庫
    //batchDataService.save(batchData);
    //方式2. 異步隊列
    AsyncVo<BatchData> asyncVo = new AsyncVo<>();
    asyncVo.setParams(batchData);
    produceQueue.putQueue(asyncVo);
    return ResponseData.success();
}           

2.3 阻塞隊列 (包名:package com.huigu.gpt.queue)

生産者:

@Component
public class ProduceQueue {
  /**
  * 阻塞隊列,設定緩沖容量為Integer.MAX_VALUE 不做容量限制
  */
  private static final BlockingQueue<AsyncVo<?>> queue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
  public BlockingQueue<AsyncVo<?>> getQueue() {
    return queue;
  }
  /**
  * 資料隊列
  * @param vo
  */
	public void putQueue(AsyncVo<?> vo){
    try {
    	queue.put(vo);
    } catch (Exception e) {
    	e.printStackTrace();
    }
  }
}           

消費者:

@Component
@Slf4j
public class ConsumQuene extends Thread{
    @Autowired
    private ProduceQueue produceQueue;
    @Autowired
    private IBatchDataService batchDataService;
    private static boolean shutDown = true;
    public void setShutDown() {
    		shutDown = false;
    }
    @Override
    public void run() {
        while (shutDown) {
            try {
              int queueSize = produceQueue.getQueue().size();
              if (queueSize != 0){
              log.info("隊列長度-> {}" , queueSize);
            }
            List<AsyncVo> asyncVoList = new ArrayList<>();
            //1.500條資料入庫;2.等待100 毫秒,沒達到500條進行入庫 3. 沒有資料會阻塞目前線程
            Queues.drain(produceQueue.getQueue(), asyncVoList, 500, 100, TimeUnit.MILLISECONDS);
            if (asyncVoList.size() == 0){
                continue;
            }
            List<Object> objectList = asyncVoList.stream().map(AsyncVo::getParams).collect(Collectors.toList());
            if (objectList.size() > 0){
                List<BatchData> batchDataList = (List)objectList;
                batchDataService.saveBatch(batchDataList);
            }
            } catch (Exception e) {
            e.printStackTrace();
            }
        }
   }
}           

隊列監聽

@Component
public class QueueListener {
  @Autowired
  private ConsumQuene consumQuene;
  /**
  * 初始化時啟動監聽請求隊列
  */
  @PostConstruct
  public void init() {
  	consumQuene.start();
  }
  /**
  * 銷毀容器時停止監聽任務
  */
  @PreDestroy
  public void destory() {
  	consumQuene.setShutDown();
  }
}           

3. 壓力測試(Jmeter)

每秒啟動50個使用者線程,持續輸出十分鐘

【Java高并發調優系列代碼實戰】阻塞隊列

3.1 同步模式下使用JMeter壓測,結果如下:

【Java高并發調優系列代碼實戰】阻塞隊列

該模式下,TPS約為: 437/sec

控制台輸出如下:

【Java高并發調優系列代碼實戰】阻塞隊列

3.2 阻塞隊列模式下壓測結果如下:

【Java高并發調優系列代碼實戰】阻塞隊列

該模式下TPS約為: 454/sec 明顯高于同步模式下的TPS,并且随着持續加壓,差別會越來越明顯,這就是相同硬體配置下通過優化代碼增加系統吞吐量的關鍵所在。

控制台輸出如下:

【Java高并發調優系列代碼實戰】阻塞隊列

Mysql資料庫存儲情況

【Java高并發調優系列代碼實戰】阻塞隊列

繼續閱讀