天天看点

rocketmq顺序发送消息

1、概念

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

1.1 全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。

1.2 分区顺序(一般都是使用分区顺序)

对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

2、注意点

想要实现顺序消费,发送方式必须为同步发送,异步发送无法保证消息的发送顺序!

3、订单进行分区有序示例

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

3.1 生产者

package com.ybw.rocketmq.demo.mq;
 
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.constant.MqConstant;
import com.ybw.rocketmq.demo.dto.OrderStep;
import com.ybw.rocketmq.demo.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
 
 
/**
 * 生产者
 *
 * @author ybwei
 * @date 2022/3/8 21:08
 **/
@Component("sender")
@Slf4j
public class Sender {
 
    @Resource
    private RocketMQTemplate rocketMQTemplate;
 
    /**
     * 顺序发送
     * 注:想要实现顺序消费,发送方式必须为同步发送,异步发送无法保证消息的发送顺序!
     *
     * @param orderStep
     * @return void
     * @throws
     * @methodName: syncSendOrderly
     * @author ybwei
     * @date 2022/3/8 19:52
     */
    public void syncSendOrderly(OrderStep orderStep) {
        Message<OrderStep> message = MessageBuilder.withPayload(orderStep)
                .setHeader(MessageConst.PROPERTY_KEYS, StringUtils.generateUUID())
                .setHeader(MessageConst.PROPERTY_PRODUCER_GROUP, MqConstant.PRODUCER_GROUP)
                .build();
        //hashKey:该参数的作用即是在发送消息的时候,固定发送到一个队列(默认情况下rocketmq中的topic有4个队列)以保证顺序。
        //基于 hashKey 的哈希值取余,选择对应的队列
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConstant.TOPIC, message, orderStep.getOrderId().toString());
        log.info("sendResult:{}", JSON.toJSONString(sendResult));
    }
 
 
}           

RocketMQTemplate 在发送顺序消息时,默认采用 SelectMessageQueueByHash 策略。

3.2 消费者

package com.ybw.rocketmq.demo.mq;
 
import com.ybw.rocketmq.demo.constant.MqConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
import java.util.concurrent.TimeUnit;
 
/**
 * 设置消费模式consumeMode = ConsumeMode.ORDERLY,默认情况下是并发消费模式(ConsumeMode.CONCURRENTLY)
 *
 * @author ybwei
 * @date 2022/3/8 17:20
 **/
@Component("consumer")
@Slf4j
@RocketMQMessageListener(topic = MqConstant.TOPIC, consumerGroup = MqConstant.CONSUMER_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class Consumer implements RocketMQListener<MessageExt> {
 
 
    @Override
    public void onMessage(MessageExt messageExt) {
        String msg = new String(messageExt.getBody());
        log.info("consumer start,msg:{}", msg);
        try {
            //模拟业务处理
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            //抛异常,稍后消费
            throw new RuntimeException(e.getMessage());
        }
        log.info("consumer end,msg:{}", msg);
    }
}           

设置消费模式consumeMode = ConsumeMode.ORDERLY,默认情况下是并发消费模式(ConsumeMode.CONCURRENTLY)。

3.3 测试代码

package com.ybw.rocketmq.demo.mq;
 
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.dto.OrderStep;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
 
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
/**
 * @author ybwei
 * @date 2022/3/8 17:30
 **/
@SpringBootTest
@Slf4j
class SenderTest {
    @Resource
    private Sender sender;
 
    /**
     * 生产者顺序:创建、付款、推送、完成
     * 消费者顺序:创建、付款、推送、完成
     *
     * @return void
     * @throws
     * @methodName: syncSendOrderly
     * @author ybwei
     * @date 2022/3/8 17:31
     */
    @Test
    public void syncSendOrderly() throws InterruptedException {
        // 订单列表
        List<OrderStep> orderStepList = buildOrders();
        log.info("orderStepList:{}", JSON.toJSONString(orderStepList));
        orderStepList.forEach(orderStep -> {
            sender.syncSendOrderly(orderStep);
        });
        TimeUnit.DAYS.sleep(1);
    }
 
    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();
 
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
 
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
 
 
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
 
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
 
        return orderList;
    }
}           

消费者日志

[INFO ] 2022-03-08 21:42:11.825 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"创建"}
[INFO ] 2022-03-08 21:42:16.834 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"创建"}
[INFO ] 2022-03-08 21:42:16.835 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"完成"}
[INFO ] 2022-03-08 21:42:31.869 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"完成"}           

从日志中可以看出,“创建”消费完后,才会消费“付款”,以此类推,只有上一个消息执行完,才会执行下一个消息。

3.4 选择策略

在 RocketMQ 中,Producer 可以根据定义 MessageQueueSelector 消息队列选择策略,选择 Topic 下的队列。目前提供三种策略:

  • SelectMessageQueueByHash:通过 hash 进行选择 queue。
  • SelectMessageQueueByRandom:随机选择 queue。
  • SelectMessageQueueByMachineRoom:机房选择queue(未实现)

SelectMessageQueueByHash

public class SelectMessageQueueByHash implements MessageQueueSelector {
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		int value = arg.hashCode();
		if (value < 0) {
			value = Math.abs(value);
		}
		value %= mqs.size();
		return ((MessageQueue) mqs.get(value));
	}
}           

通过提供的参数获取其HashCode,如果为负值则取绝对值,hash值与队列的总数进行取模获取其队列。

SelectMessageQueueByRandom

public class SelectMessageQueueByRandom implements MessageQueueSelector {
	private Random random;
 
	public SelectMessageQueueByRandom() {
		this.random = new Random(System.currentTimeMillis());
	}
	
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		int value = this.random.nextInt(mqs.size());
		return ((MessageQueue) mqs.get(value));
	}
}           

生成一个队列数以内的随机数,通过随机数获取队列。

SelectMessageQueueByMachineRoom(未实现)

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
	private Set<String> consumeridcs;
 
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		return null;
	}
 
	public Set<String> getConsumeridcs() {
		return this.consumeridcs;
	}
 
	public void setConsumeridcs(Set<String> consumeridcs) {
		this.consumeridcs = consumeridcs;
	}
}           

我们发现其select方法为null,其实是没有进行实现,需要我们自己实现。

总结

虽然RocketMQ提供了三种(其实2种,SelectMessageQueueByMachineRoom未实现)队列选择算法,但是不建议使用,不同的业务规则其选择队列的算法也不尽相同,建议手动实现。

4、源代码地址

https://gitee.com/xixingzhe2/share/tree/master/mq/rocketmq-in-order-demo