1、概念
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
順序消費的原了解析,在預設的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息隻依次發送到同一個queue中,消費的時候隻從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue隻有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。
1.1 全局順序
對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行釋出和消費。
适用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息釋出和消費的場景。
1.2 分區順序(一般都是使用分區順序)
對于指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區内的消息按照嚴格的 FIFO 順序進行釋出和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。
适用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息釋出和消費的場景。
2、注意點
想要實作順序消費,發送方式必須為同步發送,異步發送無法保證消息的發送順序!
3、訂單進行分區有序示例
一個訂單的順序流程是:建立、付款、推送、完成。訂單号相同的消息會被先後發送到同一個隊列中,消費時,同一個OrderId擷取到的肯定是同一個隊列。
3.1 生産者
package com.ybw.rocketmq.demo.mq;
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.constant.MqConstant;
import com.ybw.rocketmq.demo.dto.OrderStep;
import com.ybw.rocketmq.demo.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 生産者
*
* @author ybwei
* @date 2022/3/8 21:08
**/
@Component("sender")
@Slf4j
public class Sender {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 順序發送
* 注:想要實作順序消費,發送方式必須為同步發送,異步發送無法保證消息的發送順序!
*
* @param orderStep
* @return void
* @throws
* @methodName: syncSendOrderly
* @author ybwei
* @date 2022/3/8 19:52
*/
public void syncSendOrderly(OrderStep orderStep) {
Message<OrderStep> message = MessageBuilder.withPayload(orderStep)
.setHeader(MessageConst.PROPERTY_KEYS, StringUtils.generateUUID())
.setHeader(MessageConst.PROPERTY_PRODUCER_GROUP, MqConstant.PRODUCER_GROUP)
.build();
//hashKey:該參數的作用即是在發送消息的時候,固定發送到一個隊列(預設情況下rocketmq中的topic有4個隊列)以保證順序。
//基于 hashKey 的哈希值取餘,選擇對應的隊列
SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConstant.TOPIC, message, orderStep.getOrderId().toString());
log.info("sendResult:{}", JSON.toJSONString(sendResult));
}
}
RocketMQTemplate 在發送順序消息時,預設采用 SelectMessageQueueByHash 政策。
3.2 消費者
package com.ybw.rocketmq.demo.mq;
import com.ybw.rocketmq.demo.constant.MqConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 設定消費模式consumeMode = ConsumeMode.ORDERLY,預設情況下是并發消費模式(ConsumeMode.CONCURRENTLY)
*
* @author ybwei
* @date 2022/3/8 17:20
**/
@Component("consumer")
@Slf4j
@RocketMQMessageListener(topic = MqConstant.TOPIC, consumerGroup = MqConstant.CONSUMER_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
log.info("consumer start,msg:{}", msg);
try {
//模拟業務處理
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
//抛異常,稍後消費
throw new RuntimeException(e.getMessage());
}
log.info("consumer end,msg:{}", msg);
}
}
設定消費模式consumeMode = ConsumeMode.ORDERLY,預設情況下是并發消費模式(ConsumeMode.CONCURRENTLY)。
3.3 測試代碼
package com.ybw.rocketmq.demo.mq;
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.dto.OrderStep;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author ybwei
* @date 2022/3/8 17:30
**/
@SpringBootTest
@Slf4j
class SenderTest {
@Resource
private Sender sender;
/**
* 生産者順序:建立、付款、推送、完成
* 消費者順序:建立、付款、推送、完成
*
* @return void
* @throws
* @methodName: syncSendOrderly
* @author ybwei
* @date 2022/3/8 17:31
*/
@Test
public void syncSendOrderly() throws InterruptedException {
// 訂單清單
List<OrderStep> orderStepList = buildOrders();
log.info("orderStepList:{}", JSON.toJSONString(orderStepList));
orderStepList.forEach(orderStep -> {
sender.syncSendOrderly(orderStep);
});
TimeUnit.DAYS.sleep(1);
}
/**
* 生成模拟訂單資料
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("建立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
消費者日志
[INFO ] 2022-03-08 21:42:11.825 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"建立"}
[INFO ] 2022-03-08 21:42:16.834 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"建立"}
[INFO ] 2022-03-08 21:42:16.835 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"完成"}
[INFO ] 2022-03-08 21:42:31.869 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"完成"}
從日志中可以看出,“建立”消費完後,才會消費“付款”,以此類推,隻有上一個消息執行完,才會執行下一個消息。
3.4 選擇政策
在 RocketMQ 中,Producer 可以根據定義 MessageQueueSelector 消息隊列選擇政策,選擇 Topic 下的隊列。目前提供三種政策:
- SelectMessageQueueByHash:通過 hash 進行選擇 queue。
- SelectMessageQueueByRandom:随機選擇 queue。
- SelectMessageQueueByMachineRoom:機房選擇queue(未實作)
SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return ((MessageQueue) mqs.get(value));
}
}
通過提供的參數擷取其HashCode,如果為負值則取絕對值,hash值與隊列的總數進行取模擷取其隊列。
SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random;
public SelectMessageQueueByRandom() {
this.random = new Random(System.currentTimeMillis());
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = this.random.nextInt(mqs.size());
return ((MessageQueue) mqs.get(value));
}
}
生成一個隊列數以内的随機數,通過随機數擷取隊列。
SelectMessageQueueByMachineRoom(未實作)
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return this.consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
我們發現其select方法為null,其實是沒有進行實作,需要我們自己實作。
總結
雖然RocketMQ提供了三種(其實2種,SelectMessageQueueByMachineRoom未實作)隊列選擇算法,但是不建議使用,不同的業務規則其選擇隊列的算法也不盡相同,建議手動實作。
4、源代碼位址
https://gitee.com/xixingzhe2/share/tree/master/mq/rocketmq-in-order-demo