天天看點

rocketmq順序發送消息

1、概念

嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。

順序消費的原了解析,在預設的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息隻依次發送到同一個queue中,消費的時候隻從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue隻有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。

1.1 全局順序

對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行釋出和消費。

适用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息釋出和消費的場景。

1.2 分區順序(一般都是使用分區順序)

對于指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區内的消息按照嚴格的 FIFO 順序進行釋出和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。

适用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息釋出和消費的場景。

2、注意點

想要實作順序消費,發送方式必須為同步發送,異步發送無法保證消息的發送順序!

3、訂單進行分區有序示例

一個訂單的順序流程是:建立、付款、推送、完成。訂單号相同的消息會被先後發送到同一個隊列中,消費時,同一個OrderId擷取到的肯定是同一個隊列。

3.1 生産者

package com.ybw.rocketmq.demo.mq;
 
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.constant.MqConstant;
import com.ybw.rocketmq.demo.dto.OrderStep;
import com.ybw.rocketmq.demo.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
 
 
/**
 * 生産者
 *
 * @author ybwei
 * @date 2022/3/8 21:08
 **/
@Component("sender")
@Slf4j
public class Sender {
 
    @Resource
    private RocketMQTemplate rocketMQTemplate;
 
    /**
     * 順序發送
     * 注:想要實作順序消費,發送方式必須為同步發送,異步發送無法保證消息的發送順序!
     *
     * @param orderStep
     * @return void
     * @throws
     * @methodName: syncSendOrderly
     * @author ybwei
     * @date 2022/3/8 19:52
     */
    public void syncSendOrderly(OrderStep orderStep) {
        Message<OrderStep> message = MessageBuilder.withPayload(orderStep)
                .setHeader(MessageConst.PROPERTY_KEYS, StringUtils.generateUUID())
                .setHeader(MessageConst.PROPERTY_PRODUCER_GROUP, MqConstant.PRODUCER_GROUP)
                .build();
        //hashKey:該參數的作用即是在發送消息的時候,固定發送到一個隊列(預設情況下rocketmq中的topic有4個隊列)以保證順序。
        //基于 hashKey 的哈希值取餘,選擇對應的隊列
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConstant.TOPIC, message, orderStep.getOrderId().toString());
        log.info("sendResult:{}", JSON.toJSONString(sendResult));
    }
 
 
}           

RocketMQTemplate 在發送順序消息時,預設采用 SelectMessageQueueByHash 政策。

3.2 消費者

package com.ybw.rocketmq.demo.mq;
 
import com.ybw.rocketmq.demo.constant.MqConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
import java.util.concurrent.TimeUnit;
 
/**
 * 設定消費模式consumeMode = ConsumeMode.ORDERLY,預設情況下是并發消費模式(ConsumeMode.CONCURRENTLY)
 *
 * @author ybwei
 * @date 2022/3/8 17:20
 **/
@Component("consumer")
@Slf4j
@RocketMQMessageListener(topic = MqConstant.TOPIC, consumerGroup = MqConstant.CONSUMER_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class Consumer implements RocketMQListener<MessageExt> {
 
 
    @Override
    public void onMessage(MessageExt messageExt) {
        String msg = new String(messageExt.getBody());
        log.info("consumer start,msg:{}", msg);
        try {
            //模拟業務處理
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            //抛異常,稍後消費
            throw new RuntimeException(e.getMessage());
        }
        log.info("consumer end,msg:{}", msg);
    }
}           

設定消費模式consumeMode = ConsumeMode.ORDERLY,預設情況下是并發消費模式(ConsumeMode.CONCURRENTLY)。

3.3 測試代碼

package com.ybw.rocketmq.demo.mq;
 
import com.alibaba.fastjson.JSON;
import com.ybw.rocketmq.demo.dto.OrderStep;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
 
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
/**
 * @author ybwei
 * @date 2022/3/8 17:30
 **/
@SpringBootTest
@Slf4j
class SenderTest {
    @Resource
    private Sender sender;
 
    /**
     * 生産者順序:建立、付款、推送、完成
     * 消費者順序:建立、付款、推送、完成
     *
     * @return void
     * @throws
     * @methodName: syncSendOrderly
     * @author ybwei
     * @date 2022/3/8 17:31
     */
    @Test
    public void syncSendOrderly() throws InterruptedException {
        // 訂單清單
        List<OrderStep> orderStepList = buildOrders();
        log.info("orderStepList:{}", JSON.toJSONString(orderStepList));
        orderStepList.forEach(orderStep -> {
            sender.syncSendOrderly(orderStep);
        });
        TimeUnit.DAYS.sleep(1);
    }
 
    /**
     * 生成模拟訂單資料
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();
 
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("建立");
        orderList.add(orderDemo);
 
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
 
 
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
 
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
 
        return orderList;
    }
}           

消費者日志

[INFO ] 2022-03-08 21:42:11.825 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"建立"}
[INFO ] 2022-03-08 21:42:16.834 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"建立"}
[INFO ] 2022-03-08 21:42:16.835 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"付款"}
[INFO ] 2022-03-08 21:42:21.843 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"推送"}
[INFO ] 2022-03-08 21:42:26.853 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer start,msg:{"orderId":15103111039,"desc":"完成"}
[INFO ] 2022-03-08 21:42:31.869 [ConsumeMessageThread_1] com.ybw.rocketmq.demo.mq.Consumer - consumer end,msg:{"orderId":15103111039,"desc":"完成"}           

從日志中可以看出,“建立”消費完後,才會消費“付款”,以此類推,隻有上一個消息執行完,才會執行下一個消息。

3.4 選擇政策

在 RocketMQ 中,Producer 可以根據定義 MessageQueueSelector 消息隊列選擇政策,選擇 Topic 下的隊列。目前提供三種政策:

  • SelectMessageQueueByHash:通過 hash 進行選擇 queue。
  • SelectMessageQueueByRandom:随機選擇 queue。
  • SelectMessageQueueByMachineRoom:機房選擇queue(未實作)

SelectMessageQueueByHash

public class SelectMessageQueueByHash implements MessageQueueSelector {
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		int value = arg.hashCode();
		if (value < 0) {
			value = Math.abs(value);
		}
		value %= mqs.size();
		return ((MessageQueue) mqs.get(value));
	}
}           

通過提供的參數擷取其HashCode,如果為負值則取絕對值,hash值與隊列的總數進行取模擷取其隊列。

SelectMessageQueueByRandom

public class SelectMessageQueueByRandom implements MessageQueueSelector {
	private Random random;
 
	public SelectMessageQueueByRandom() {
		this.random = new Random(System.currentTimeMillis());
	}
	
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		int value = this.random.nextInt(mqs.size());
		return ((MessageQueue) mqs.get(value));
	}
}           

生成一個隊列數以内的随機數,通過随機數擷取隊列。

SelectMessageQueueByMachineRoom(未實作)

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
	private Set<String> consumeridcs;
 
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		return null;
	}
 
	public Set<String> getConsumeridcs() {
		return this.consumeridcs;
	}
 
	public void setConsumeridcs(Set<String> consumeridcs) {
		this.consumeridcs = consumeridcs;
	}
}           

我們發現其select方法為null,其實是沒有進行實作,需要我們自己實作。

總結

雖然RocketMQ提供了三種(其實2種,SelectMessageQueueByMachineRoom未實作)隊列選擇算法,但是不建議使用,不同的業務規則其選擇隊列的算法也不盡相同,建議手動實作。

4、源代碼位址

https://gitee.com/xixingzhe2/share/tree/master/mq/rocketmq-in-order-demo