天天看點

RocketMQ-Push模式下并發消費和順序消費的差別

  1. 注冊的消息監聽不同

    并發消費:

    consumer.registerMessageListener(new MessageListenerConcurrently() {}

    順序消費:

    consumer.registerMessageListener(new MessageListenerOrderly() {}

  2. 傳回狀态碼不同

    并發消費:

    public enum ConsumeConcurrentlyStatus {
        // 消費成功
        CONSUME_SUCCESS,
        // 消費失敗,稍後再從Broker拉取消息重新消費(重新拉取的消息是由Broker複制原消息的新消息)
        RECONSUME_LATER;
    }
               
    順序消費:
    public enum ConsumeOrderlyStatus {
        // 消費成功
        SUCCESS,
        /**
         * Rollback consumption(only for binlog consumption)
         */
        @Deprecated
        ROLLBACK,
        /**
         * Commit offset(only for binlog consumption)
         */
        @Deprecated
        COMMIT,
        // 消費失敗,挂起目前隊列,挂起期間,目前消息重試消費,直到消息進入死信隊列
        SUSPEND_CURRENT_QUEUE_A_MOMENT;
    }
               
  3. 消息重新消費的邏輯不同

    并發消費(重新消費的消息由

    Broker

    複制原消息,并丢入重試隊列):

    消費者傳回

    ConsumeConcurrentlyStatus.RECONSUME_LATER

    時,

    Broker

    會建立一條與原先消息屬性相同的消息,并配置設定新的唯一的

    msgId

    ,另外存儲原消息的

    msgId

    ,新消息會存入到

    commitLog

    檔案中,并進入重試隊列,擁有一個全新的隊列偏移量,延遲

    5s

    後重新消費。如果消費者仍然傳回

    RECONSUME_LATER

    ,那麼會重複上面的操作,直到重新消費

    maxReconsumerTimes

    次,當重新消費次數超過最大次數時,進入死信隊列,消息消費成功。

    順序消費(重新消費不涉及

    Broker

    ):

    消費者傳回

    ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

    時,目前隊列會挂起(此消息後面的消息停止消費,直到此消息完成消息重新消費的整個過程),然後此消息會在消費者的線程池中重新消費,即不需要

    Broker

    重新建立新的消息(不涉及重試隊列),如果消息重新消費超過

    maxReconsumerTimes

    最大次數時,進入死信隊列。當消息放入死信隊列時,

    Broker

    伺服器認為消息時消費成功的,繼續消費該隊列後續消息。
  4. 順序消費設定自動送出
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    	context.setAutoCommit(true);
    	for (MessageExt msg : msgs) {
    	// 省略...
    	}
    }
               
  5. 涉及的主題不同

    RocketMQ

    有三種主題:

    NORMAL、RETRY、DLQ

    并發消費:

    NORMAL、RETRY、DLQ

    順序消費:

    NORMAL、DLQ

  6. 順序消費在拉取任務時需要在Broker伺服器上鎖定該消息隊列