1、概念
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
1.1 全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。
1.2 分区顺序(一般都是使用分区顺序)
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
2、注意点
想要实现顺序消费,发送方式必须为同步发送,异步发送无法保证消息的发送顺序!
3、订单进行分区有序示例
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
3.1 生产者
package com.ybw.rocketmq.demo.mq;
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.constant.MqConstant;
import com.ybw.rocketmq.demo.dto.OrderStep;
import com.ybw.rocketmq.demo.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 生产者
*
* @author ybwei
* @date 2022/3/8 21:08
**/
@Component("sender")
@Slf4j
public class Sender {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 顺序发送
* 注:想要实现顺序消费,发送方式必须为同步发送,异步发送无法保证消息的发送顺序!
*
* @param orderStep
* @return void
* @throws
* @methodName: syncSendOrderly
* @author ybwei
* @date 2022/3/8 19:52
*/
public void syncSendOrderly(OrderStep orderStep) {
Message<OrderStep> message = MessageBuilder.withPayload(orderStep)
.setHeader(MessageConst.PROPERTY_KEYS, StringUtils.generateUUID())
.setHeader(MessageConst.PROPERTY_PRODUCER_GROUP, MqConstant.PRODUCER_GROUP)
.build();
//hashKey:该参数的作用即是在发送消息的时候,固定发送到一个队列(默认情况下rocketmq中的topic有4个队列)以保证顺序。
//基于 hashKey 的哈希值取余,选择对应的队列
SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConstant.TOPIC, message, orderStep.getOrderId().toString());
log.info("sendResult:{}", JSON.toJSONString(sendResult));
}
}
RocketMQTemplate 在发送顺序消息时,默认采用 SelectMessageQueueByHash 策略。
3.2 消费者
package com.ybw.rocketmq.demo.mq;
import com.ybw.rocketmq.demo.constant.MqConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 设置消费模式consumeMode = ConsumeMode.ORDERLY,默认情况下是并发消费模式(ConsumeMode.CONCURRENTLY)
*
* @author ybwei
* @date 2022/3/8 17:20
**/
@Component("consumer")
@Slf4j
@RocketMQMessageListener(topic = MqConstant.TOPIC, consumerGroup = MqConstant.CONSUMER_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
log.info("consumer start,msg:{}", msg);
try {
//模拟业务处理
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
//抛异常,稍后消费
throw new RuntimeException(e.getMessage());
}
log.info("consumer end,msg:{}", msg);
}
}
设置消费模式consumeMode = ConsumeMode.ORDERLY,默认情况下是并发消费模式(ConsumeMode.CONCURRENTLY)。
3.3 测试代码
package com.ybw.rocketmq.demo.mq;
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.dto.OrderStep;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author ybwei
* @date 2022/3/8 17:30
**/
@SpringBootTest
@Slf4j
class SenderTest {
@Resource
private Sender sender;
/**
* 生产者顺序:创建、付款、推送、完成
* 消费者顺序:创建、付款、推送、完成
*
* @return void
* @throws
* @methodName: syncSendOrderly
* @author ybwei
* @date 2022/3/8 17:31
*/
@Test
public void syncSendOrderly() throws InterruptedException {
// 订单列表
List<OrderStep> orderStepList = buildOrders();
log.info("orderStepList:{}", JSON.toJSONString(orderStepList));
orderStepList.forEach(orderStep -> {
sender.syncSendOrderly(orderStep);
});
TimeUnit.DAYS.sleep(1);
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
消费者日志
[INFO ] 2022-03-08 21:42:11.825 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"创建"}
[INFO ] 2022-03-08 21:42:16.834 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"创建"}
[INFO ] 2022-03-08 21:42:16.835 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"完成"}
[INFO ] 2022-03-08 21:42:31.869 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"完成"}
从日志中可以看出,“创建”消费完后,才会消费“付款”,以此类推,只有上一个消息执行完,才会执行下一个消息。
3.4 选择策略
在 RocketMQ 中,Producer 可以根据定义 MessageQueueSelector 消息队列选择策略,选择 Topic 下的队列。目前提供三种策略:
- SelectMessageQueueByHash:通过 hash 进行选择 queue。
- SelectMessageQueueByRandom:随机选择 queue。
- SelectMessageQueueByMachineRoom:机房选择queue(未实现)
SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return ((MessageQueue) mqs.get(value));
}
}
通过提供的参数获取其HashCode,如果为负值则取绝对值,hash值与队列的总数进行取模获取其队列。
SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random;
public SelectMessageQueueByRandom() {
this.random = new Random(System.currentTimeMillis());
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = this.random.nextInt(mqs.size());
return ((MessageQueue) mqs.get(value));
}
}
生成一个队列数以内的随机数,通过随机数获取队列。
SelectMessageQueueByMachineRoom(未实现)
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return this.consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
我们发现其select方法为null,其实是没有进行实现,需要我们自己实现。
总结
虽然RocketMQ提供了三种(其实2种,SelectMessageQueueByMachineRoom未实现)队列选择算法,但是不建议使用,不同的业务规则其选择队列的算法也不尽相同,建议手动实现。
4、源代码地址
https://gitee.com/xixingzhe2/share/tree/master/mq/rocketmq-in-order-demo