天天看點

RocketMQ-順序消息

github代碼下載下傳位址

1.RocketMQ有三種消息

    1)普通消息

    2)順序消息

    3)事務消息

2.順序消息

    概念:是MQ提供的一種嚴格按照順序進行釋出和消費的消息類型;是以可以看出,順序消息由兩部分組成,順序釋出和

順序消費。

3.在MQ中如何保證順序消費

        (1) 消息發送是保證是順序的    (2) 消息被存儲時保證是和發送的順序一緻 (3)消息被消費時保證和存儲的順序一緻

發送消息保證是順序的意味着對于有順序的消息,使用者要保證使用同一個線程采用同步的方式發送;而存儲和發送的

順序一緻則要求在同一個線程發送過來的消息A和消息B,存儲時在空間上消息A一定在消息B之前;最後消費和存儲的順

序保持一緻則要求,在消息A和消息B到達Consumer之後必須按照先消費消息A在消費消息B的順序執行

RocketMQ-順序消息

對于兩個訂單的消息的原始資料:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):

   1) 在發送時,a訂單的消息必須保證a1 ,a2 , a3的順序發送,b訂單的消息也一樣,但是a,b訂單之間的消息沒有順序關系;

這意味這a,b訂單的消息可以通過不同的線程發送出去

   2) 在存儲時,需要分别保證a,b訂單各自消息的順序,但是a,b訂單之間的消息可以不保證

   3)在消費時,隻要保證每一個分區隻有一個線程來去處理即可當然,如果a、b在一個分區中,在收到消息後也可以将他們拆分到不同線程中處理,不過要權衡一下收益

4.生産者代碼

      1)生産者

package com.roger.order.producer;

import com.roger.order.entity.OrderMsgDTO;
import com.roger.utils.SnowflakeIdWorker;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class OrderMqProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer defaultMQProducer =
                new DefaultMQProducer("orderMQProducerGroup");
        defaultMQProducer.setNamesrvAddr("172.20.10.60:9876");
        defaultMQProducer.start();
        defaultMQProducer.createTopic("OrderTopic", "OrderTopic", 3);

        String[] tags = new String[]{"TagC", "TagP", "TagF"};

        List<OrderMsgDTO> orderMsgList = new ArrayList<>();
        int orderCount = 5;
        for (int i = 0; i < orderCount; i++) {
            long orderId = SnowflakeIdWorker.getInstance().nextId();
            OrderMqProducer.builderOrderMsgList(orderMsgList, orderId);
        }

        for (int i = 0; i < orderMsgList.size(); i++) {
            OrderMsgDTO orderMsgDTO = orderMsgList.get(i);
            String body = orderMsgDTO.toString();
            Message msg = new Message("OrderTopic",
                    tags[i % tags.length],
                    "OrderKey" + i,
                    body.getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {

                //List<MessageQueue> msgQueList 消息要發送的Topic下的所有分區
                //Message message 消息對象
                // Object args 額外的參數,使用者可以自己傳遞參數
                // 比如為了把同一個訂單的消息發送到同一個分區中,
                // 可以把訂單号作為一個參數傳遞過去然後mod分區個數,
                // 就可以保證把同一個訂單的消息發送到同一個分區中去
                @Override
                public MessageQueue select(List<MessageQueue> msgQueList, Message message, Object args) {
                    long orderId = (long) args;
                    long index = orderId % msgQueList.size();
                    return msgQueList.get((int) index);
                }
            }, orderMsgDTO.getOrderId());

            System.out.println(sendResult +
                    String.format("message [%s] send success.",
                            new String(msg.getBody())));
        }
        //defaultMQProducer.shutdown();
    }


    private static void builderOrderMsgList(List<OrderMsgDTO> orderMsgList, long orderId) {
        orderMsgList.add(new OrderMsgDTO(orderId, "Create"));
        orderMsgList.add(new OrderMsgDTO(orderId, "PayOff"));
        orderMsgList.add(new OrderMsgDTO(orderId, "Finish"));
    }
}
           

2.topic配置資訊

RocketMQ-順序消息

3 運作結果- 5個訂單的消息分到了三個隊列(queue=0,1,2)中去了

SendResult [sendStatus=SEND_OK, 

            msgId=AC140A051DC818B4AAC2938813480000, 

            offsetMsgId=AC140A3C00002A9F00000000000297A4, 

            messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], 

            queueOffset=3

        ]

message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)] send success.

SendResult [sendStatus=SEND_OK, 

            msgId=AC140A051DC818B4AAC2938813510001, 

            offsetMsgId=AC140A3C00002A9F000000000002988D, 

            messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], 

            queueOffset=4

        ]

message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)] send success.

SendResult [sendStatus=SEND_OK, 

            msgId=AC140A051DC818B4AAC2938813C90002, 

            offsetMsgId=AC140A3C00002A9F0000000000029976, 

            messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], 

            queueOffset=5

        ]

message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813EA0003, offsetMsgId=AC140A3C00002A9F0000000000029A5F, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=3]message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813F00004, offsetMsgId=AC140A3C00002A9F0000000000029B48, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=4]message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814000005, offsetMsgId=AC140A3C00002A9F0000000000029C31, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=5]message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814A20006, offsetMsgId=AC140A3C00002A9F0000000000029D1A, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=12]message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815060007, offsetMsgId=AC140A3C00002A9F0000000000029E03, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=13]message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815110008, offsetMsgId=AC140A3C00002A9F0000000000029EEC, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=14]message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815150009, offsetMsgId=AC140A3C00002A9F0000000000029FD5, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881546000A, offsetMsgId=AC140A3C00002A9F000000000002A0BE, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881549000B, offsetMsgId=AC140A3C00002A9F000000000002A1A8, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938819F0000C, offsetMsgId=AC140A3C00002A9F000000000002A292, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881A30000D, offsetMsgId=AC140A3C00002A9F000000000002A37C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)] send success.

SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938821A0000E, offsetMsgId=AC140A3C00002A9F000000000002A466, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)] send success.

5.消費者代碼-兩個消費者代碼相同隻是細微的差别

    1.代碼

package com.roger.order.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class OrderMqConsumer1 {

    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer defaultMQPushConsumer =
                new DefaultMQPushConsumer("orderMQPushConsumerGroup");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.setNamesrvAddr("172.20.10.60:9876");

        defaultMQPushConsumer.subscribe("OrderTopic","*");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
            Random r = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgExtList, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.println("目前線程名:" + Thread.currentThread().getName() + ";Receive new message:");
                for(MessageExt msgExt : msgExtList){
                    System.out.println(String.format("Consume message [%s],TagName [%s]",
                            new String(msgExt.getBody()),
                            msgExt.getTags()));
                    try {
                        //簡單業務處理邏輯
                        TimeUnit.SECONDS.sleep(r.nextInt(10));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        defaultMQPushConsumer.start();

        System.out.println("OrderMqConsumer1 Started...");
    }
}
           

   2.OrderMqConsumer1運作結果-

             (a) 消費了queue=0 和queue=1的資料

             (b) 每個訂單按照順序執行,并且是訂單号相同的消息同一個線程執行的

OrderMqConsumer1 Started...

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)],TagName [TagC]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)],TagName [TagP]

目前線程名:ConsumeMessageThread_2;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)],TagName [TagC]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)],TagName [TagF]

目前線程名:ConsumeMessageThread_2;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)],TagName [TagP]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)],TagName [TagC]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)],TagName [TagP]

目前線程名:ConsumeMessageThread_2;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)],TagName [TagF]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)],TagName [TagF]

目前線程名:ConsumeMessageThread_2;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)],TagName [TagC]

目前線程名:ConsumeMessageThread_2;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)],TagName [TagP]

目前線程名:ConsumeMessageThread_2;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)],TagName [TagF]

       3.OrderMqConsumer1運作結果-

OrderMqConsumer2 Started...

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)],TagName [TagC]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)],TagName [TagP]

目前線程名:ConsumeMessageThread_1;Receive new message:

Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)],TagName [TagF]

繼續閱讀