天天看點

pmq再學習三

前面我們已經了解了在做好基礎資料的準備工作後,啟動測試的時候,會做一個注冊消費組的工作,完成後,我們就可以執行生産者發消息操作了。發消息的操作是:

@GetMapping("/test1")
public void test1(@RequestParam String topicName, @RequestParam int count) {
    if (Util.isEmpty(topicName))
        return;
    Executors.newSingleThreadExecutor().submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 1; i < count; i++) {
                try {
                    MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i)));
                } catch (MqNotInitException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (ContentExceed65535Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                Util.sleep(10);
            }
        }
    });
}           

複制

發送消息完成後,由于其采用的拉模式,我們可以看到消息在經過發送,存儲到資料庫之後,會做一個通知拉取資料操作,然後執行拉取。拉取完成後,進行響應。此時會進行消費操作,而這個過程的處理關鍵是handleData操作,從代碼中,我們可以看到其是執行的線程操作是一個batchExcute批量執行操作,可以看到其裡面有一個重要方法:threadExcute方法,進而進一步看到我們想看到的方法doMessageReceived,這個方法會調用我們自定義的方法實作消費。

// 執行消費操作
@Override
public List<Long> onMessageReceived(List<MessageDto> messages) {
    try {
        // 執行消息消費
        Transaction catTransaction = null;
        System.out.println("開始接收生産者發送過來的消息");
        for (MessageDto messageDto : messages) {
            // 執行消息消費
            System.out.println("目前接收到的消息是messageDto消費組:" + messageDto.getConsumerGroupName() + " " + "目前接收的消費主題:" + messageDto.getTopicName()
                + " " + "目前接收到的消息:" + messageDto.getBody());
        }
        System.out.println("接收成功");
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return null;
}           

複制

而從消息的結果可以看到其列印出來的消費消息。