天天看點

RocketMQ進階-延時消息前言

前言

在開發中經常會遇到延時任務的需求,例如在12306購買車票,若生成訂單30分鐘未支付則自動取消;還有線上商城完成訂單後48小時不評價 ,自動5星好評。像這類在某事件觸發後一段時間内執行的需求任務我們稱之為 延時任務。

那麼如何實作延遲任務呢?

第一反應是利用cron方案來實作:

RocketMQ進階-延時消息前言

啟動一個cron定時任務,每隔一段時間執行一次,比如30分鐘,找到那些逾時的資料,直接更新狀态,或者拿出來執行一些操作。如果資料量比較大,需要分頁查詢,分頁update,這将是一個for循環更新操作。

cron方案是很常見的一種方案,但是常見的不一定是最好的,主要有以下幾個問題:

  • 當資料量大的時候輪詢效率低;
  • 時效性不夠好,如果每小時輪詢一次,最差的情況時間誤差會達到1小時;
  • 如果通過增加cron輪詢頻率來減少時間誤差,則會出現輪詢低效和重複計算的問題;

既然cron方案不是很理想,那就請出我們今天的主角,使用RocketMQ的延時消息解決。在建立訂單的時候發送一條延時消息到RocketMQ,30分鐘後消費者消費消息去檢查訂單的狀态,如果發現訂單未支付則取消訂單釋放庫存。

實作

RocketMQ延遲隊列的核心思路是:所有的延遲消息由producer發出之後,都會存放到同一個topic(SCHEDULE_TOPIC_XXXX)下,不同的延遲級别會對應不同的隊列序号,當延遲時間到之後,由定時線程讀取轉換為普通的消息存的真實指定的topic下,此時對于consumer端此消息才可見,進而被consumer消費。

注意:RocketMQ不支援任意時間的延時,隻支援以下幾個固定的延時等級

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我們結合SprintBoot利用RocketMQ發送延時消息

  • 引入RocketMQ元件
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>      
  • 增加RocketMQ的配置
rocketmq:
  name-server: 172.31.0.44:9876
  producer:
    group: delay-group      
  • 編寫生産者
@Component
@Slf4j
public class DelayProduce {
    @Autowired
    private RocketMQTemplate rocketMQTemplatet;
 
    public void sendDelayMessage(String topic,String message,int delayLevel){
       SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
        log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
        log.info("sendResult is{}",sendResult);
    }
}      
  • 編寫消費者
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "delay-topic",
        consumerGroup = "delay-group"
)
public class DelayConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
        log.info("received message is {}",message);
    }
}      
  • 測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest {
    @Autowired
    private DelayProduce delayProduce;
 
    @Test
    public void sendDelayMessage() {
        delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知錄",5);
    }
}      

這裡delayLevel設定成5,對應RocketMQ的延時等級就是1分鐘後投遞消息。

  • 運作結果
RocketMQ進階-延時消息前言
  • 發送時間
RocketMQ進階-延時消息前言

消費時間

修改延時級别

RocketMQ的延遲等級可以進行修改,以滿足自己的業務需求,可以修改/添加新的level。例如:你想支援1天的延遲,修改最後一個level的值為1d,這個時候依然是18個level;也可以增加一個1d,這個時候總共就有19個level。

  • 打開RocketMQ的配置檔案,修改

    messageDelayLevel

    屬性
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
storePathRootDir = /app/rocketmq/data
messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h      

這次将延時等級1修改成了90s,生産者發送消息後需要90s後再進行消息投遞。修改完成後重新開機RocketMQ。

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

  • 使用延時等級1發送消息
public void sendDelayMessage() {
 delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知錄",1);
}      
RocketMQ進階-延時消息前言
RocketMQ進階-延時消息前言

通過比對發送時間與消費時間證明延時等級修改生效。

RocketMQ 相關文章