天天看點

RocketMQ msgId與offsetMsgId釋疑(實戰篇)

做積極的人,越努力越幸運!

RocketMQ msgId與offsetMsgId釋疑(實戰篇)
本文将詳細介紹消息發送、消息消費、RocketMQ queryMsgById 指令以及 rocketmq-console 等使用場景中究竟是用的哪一個ID。

1、抛出問題

1.1 從消息發送看消息ID

package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
    public static void main(String[] args)  {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            Message msg = new Message("TestTopic" /* Topic */,null /* Tag */, ("Hello RocketMQ test1" ).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            producer.shutdown();
        } catch (Throwable e) {
            e.printStackTrace();
        }

    }
}           

執行效果如圖所示:

RocketMQ msgId與offsetMsgId釋疑(實戰篇)

即消息發送會傳回 msgId 與 offsetMsgId。

1.2 從消息消費看消息ID

package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.println("MessageExt msg.getMsgId():" +  msgs.get(0).getMsgId());
                System.out.println("-------------------分割線-----------------");
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}           
RocketMQ msgId與offsetMsgId釋疑(實戰篇)

不知道大家是否有注意到,調用 msgs.get(0).getMsgId()傳回的msgId 與直接輸出msgs中的 msgId 不一樣,那這又是為什麼呢?答案在本文的第二部分有詳細分析。

2、消息ID釋疑

從消息發送的結果可以得知,RocketMQ 發送的傳回結果會傳回 msgId 與 offsetMsgId,那這兩個 msgId 分别是代表什麼呢?

  • msgId:該 ID 是消息發送者在消息發送時會首先在用戶端生成,全局唯一,在 RocketMQ 中該 ID 還有另外的一個叫法:uniqId,無不展現其全局唯一性。
  • offsetMsgId:消息偏移ID,該 ID 記錄了消息所在叢集的實體位址,主要包含所存儲 Broker 伺服器的位址( IP 與端口号)以及所在commitlog 檔案的實體偏移量。

2.1 msgId 即全局唯一 ID 建構規則

RocketMQ msgId與offsetMsgId釋疑(實戰篇)

從這張圖可以看出,msgId确實是用戶端生成的,接下來我們詳細分析一下其生成算法。

MessageClientIDSetter#createUniqID

public static String createUniqID() {
    StringBuilder sb = new StringBuilder(LEN * 2);
    sb.append(FIX_STRING);    // @1
    sb.append(UtilAll.bytes2string(createUniqIDBuffer()));  // @2
    return sb.toString();
}           

一個 uniqID 的建構主要分成兩個部分:FIX_STRING 與唯一 ID 生成算法,顧名思義,FIX_STRING 就是一個用戶端固定一個字首,那接下來先看一下固定字元串的生成規則。

2.1.1 FIX_STRING

MessageClientIDSetter靜态代碼塊

static {
    byte[] ip;
    try {
        ip = UtilAll.getIP();
    } catch (Exception e) {
        ip = createFakeIP();
    }
    LEN = ip.length + 2 + 4 + 4 + 2;
    ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
    tempBuffer.position(0);
    tempBuffer.put(ip);
    tempBuffer.position(ip.length);
    tempBuffer.putInt(UtilAll.getPid());
    tempBuffer.position(ip.length + 2);
    tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
    FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
    setStartTime(System.currentTimeMillis());
    COUNTER = new AtomicInteger(0);
}           

從這裡可以看出 FIX_STRING 的主要由:用戶端的IP、程序ID、加載 MessageClientIDSetter 的類加載器的 hashcode。

2.1.2 唯一性算法

msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法實作。

private static byte[] createUniqIDBuffer() {
    ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
    long current = System.currentTimeMillis();
    if (current >= nextStartTime) {
        setStartTime(current);
    }
    buffer.position(0);
    buffer.putInt((int) (System.currentTimeMillis() - startTime));
    buffer.putShort((short) COUNTER.getAndIncrement());
    return buffer.array();
}           

可以得出 msgId 的後半段主要由:目前時間與系統啟動時間的內插補點,以及自增序号。

2.2 offsetMsgId建構規則

RocketMQ msgId與offsetMsgId釋疑(實戰篇)

在消息 Broker 服務端将消息追加到記憶體後會傳回其實體偏移量,即在 commitlog 檔案中的檔案,然後會再次生成一個id,代碼中雖然也叫 msgId,其實這裡就是我們常說的 offsetMsgId,即記錄了消息的實體偏移量,故我們重點來看一下其具體生成規則:

MessageDecoder#createMessageId

public static String createMessageId(final ByteBuffer input ,
            final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    input.put(addr);
    input.putLong(offset);
    return UtilAll.bytes2string(input.array());
}           

首先結合該方法的調用上下文,先解釋一下該方法三個入參的含義:

  • ByteBuffer input

    用來存放 offsetMsgId 的位元組緩存區( NIO 相關的基礎知識)

  • ByteBuffer addr

    目前 Broker 伺服器的 IP 位址與端口号,即通過解析 offsetMsgId 進而得到消息伺服器的位址資訊。

  • long offset

    消息的實體偏移量。

    即構成 offsetMsgId 的組成部分:Broker 伺服器的 IP 與端口号、消息的實體偏移量。

溫馨提示:即在 RocketMQ 中,隻需要提供 offsetMsgId,可以不必知道該消息所屬的 topic 資訊即可查詢該條消息的内容。

2.3 消息發送與消息消費傳回的消息ID資訊

消息發送時會在 SendSesult 中傳回 msgId、offsetMsgId,在了解了這個兩個 ID 的含義時則問題不大,接下來重點介紹一下消息消費時傳回的 msgId 到底是哪一個。

在消息消費時,我們更加希望因為 msgId (即用戶端生成的全局唯一性ID),因為該全局性 ID 非常友善實作消費端的幂等。

在本文的1.2節我們也提到一個現象,為什麼如下圖代碼中輸出的 msgId 會不一樣呢?

RocketMQ msgId與offsetMsgId釋疑(實戰篇)

在用戶端傳回的 msg 資訊,其最終傳回的對象是 MessageClientExt ,繼承自 MessageExt。

那我們接下來分别看一下其 getMsgId() 方法與 toString 方法即可。

@Override
public String getMsgId() {
    String uniqID = MessageClientIDSetter.getUniqID(this);
    if (uniqID == null) {
        return this.getOffsetMsgId();
    } else {
        return uniqID;
    }
}           

原來在調用 MessageClientExt 中的 getMsgId 方法時,如果消息的屬性中存在其唯一ID,則傳回消息的全局唯一ID,否則傳回消息的 offsetMsgId。

而 MessageClientExt 方法并沒有重寫 MessageExt 的 toString 方法,其實作如圖所示:

RocketMQ msgId與offsetMsgId釋疑(實戰篇)

故傳回的是 MessageExt中 的 msgId,該 msgId 存放的是 offsetMsgId,是以才造成了困擾。

溫馨提示:如果消息消費失敗需要重試,RocketMQ 的做法是将消息重新發送到 Broker 伺服器,此時全局 msgId 是不會發送變化的,但該消息的 offsetMsgId 會發送變化,因為其存儲在伺服器中的位置發生了變化。

3、實踐經驗

在解答了消息發送與消息消費關于msgId與offsetMsgId的困擾後,再來介紹一下如果根據 msgId 去查詢消息。

想必大家對 rocketmq-console ,那在消息查找界面,展示的消息清單中傳回的 msgId 又是哪一個呢?