-
注冊的消息監聽不同
并發消費:
順序消費:consumer.registerMessageListener(new MessageListenerConcurrently() {}
consumer.registerMessageListener(new MessageListenerOrderly() {}
-
傳回狀态碼不同
并發消費:
順序消費:public enum ConsumeConcurrentlyStatus { // 消費成功 CONSUME_SUCCESS, // 消費失敗,稍後再從Broker拉取消息重新消費(重新拉取的消息是由Broker複制原消息的新消息) RECONSUME_LATER; }
public enum ConsumeOrderlyStatus { // 消費成功 SUCCESS, /** * Rollback consumption(only for binlog consumption) */ @Deprecated ROLLBACK, /** * Commit offset(only for binlog consumption) */ @Deprecated COMMIT, // 消費失敗,挂起目前隊列,挂起期間,目前消息重試消費,直到消息進入死信隊列 SUSPEND_CURRENT_QUEUE_A_MOMENT; }
-
消息重新消費的邏輯不同
并發消費(重新消費的消息由
Broker
複制原消息,并丢入重試隊列):
消費者傳回
時,ConsumeConcurrentlyStatus.RECONSUME_LATER
會建立一條與原先消息屬性相同的消息,并配置設定新的唯一的Broker
,另外存儲原消息的msgId
,新消息會存入到msgId
檔案中,并進入重試隊列,擁有一個全新的隊列偏移量,延遲commitLog
後重新消費。如果消費者仍然傳回5s
,那麼會重複上面的操作,直到重新消費RECONSUME_LATER
maxReconsumerTimes
次,當重新消費次數超過最大次數時,進入死信隊列,消息消費成功。
順序消費(重新消費不涉及
Broker
):
消費者傳回
時,目前隊列會挂起(此消息後面的消息停止消費,直到此消息完成消息重新消費的整個過程),然後此消息會在消費者的線程池中重新消費,即不需要ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
重新建立新的消息(不涉及重試隊列),如果消息重新消費超過Broker
最大次數時,進入死信隊列。當消息放入死信隊列時,maxReconsumerTimes
伺服器認為消息時消費成功的,繼續消費該隊列後續消息。Broker
- 順序消費設定自動送出
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 省略... } }
- 涉及的主題不同
有三種主題:RocketMQ
并發消費:NORMAL、RETRY、DLQ
順序消費:NORMAL、RETRY、DLQ
NORMAL、DLQ
- 順序消費在拉取任務時需要在Broker伺服器上鎖定該消息隊列