天天看點

RockerMQ消息消費、重試

消息中間件—RocketMQ消息消費(一)

消息中間件—RocketMQ消息消費(二)(push模式實作)

消息中間件—RocketMQ消息消費(三)(消息消費重試)

MQ中Pull和Push的兩種消費方式

Push方式

由消息中間件(MQ消息伺服器代理)主動地将消息推送給消費者;采用Push方式,可以盡可能實時地将消息發送給消費者進行消費。但是,在消費者的處理消息的能力較弱的時候(比如,消費者端的業務系統處理一條消息的流程比較複雜,其中的調用鍊路比較多導緻消費時間比較久。概括起來地說就是“慢消費問題”),而MQ不斷地向消費者Push消息,消費者端的緩沖區可能會溢出,導緻異常;

Pull方式

由消費者用戶端主動向消息中間件(MQ消息伺服器代理)拉取消息;采用Pull方式,如何設定Pull消息的頻率需要重點去考慮,舉個例子來說,可能1分鐘内連續來了1000條消息,然後2小時内沒有新消息産生(概括起來說就是“消息延遲與忙等待”)。如果每次Pull的時間間隔比較久,會增加消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間内MQ中并沒有任何消息可以消費,那麼會産生很多無效的Pull請求的RPC開銷,影響MQ整體的網絡性能;

RocketMQ消息消費的長輪詢機制

RocketMQ的消費方式都是基于拉模式拉取消息的,而在這其中有一種長輪詢機制(對普通輪詢的一種優化),來平衡上面Push/Pull模型的各自缺點。基本設計思路是:消費者如果第一次嘗試Pull消息失敗(比如:Broker端沒有可以消費的消息),并不立即給消費者用戶端傳回Response的響應,而是先hold住并且挂起請求(将請求儲存至pullRequestTable本地緩存變量中),然後Broker端的背景獨立線程—PullRequestHoldService會從pullRequestTable本地緩存變量中不斷地去取,具體的做法是查詢待拉取消息的偏移量是否小于消費隊列最大偏移量,如果條件成立則說明有新消息達到Broker端(這裡,在RocketMQ的Broker端會有一個背景獨立線程—ReputMessageService不停地建構ConsumeQueue/IndexFile資料,同時取出hold住的請求并進行二次處理),則通過重新調用一次業務處理器—PullMessageProcessor的處理請求方法—processRequest()來重新嘗試拉取消息(此處,每隔5S重試一次,預設長輪詢整體的時間設定為30s)。

RocketMQ消息Pull的長輪詢機制的關鍵在于Broker端的PullRequestHoldService和ReputMessageService兩個背景線程。

從嚴格意義上說,RocketMQ并沒有實作真正的消息消費的Push模式,而是對Pull模式進行了一定的優化,一方面在Consumer端開啟背景獨立的線程—PullMessageService不斷地從阻塞隊列—pullRequestQueue中擷取PullRequest請求并通過網絡通信子產品發送Pull消息的RPC請求給Broker端。另外一方面,背景獨立線程—rebalanceService根據Topic中消息隊列個數和目前消費組内消費者個數進行負載均衡,将産生的對應PullRequest執行個體放入阻塞隊列—pullRequestQueue中。這裡算是比較典型的生産者-消費者模型,實作了準實時的自動消息拉取。然後,再根據業務回報是否成功消費來推動消費進度。

在Broker端,PullMessageProcessor業務處理器收到Pull消息的RPC請求後,通過MessageStore執行個體從commitLog擷取消息。如1.2節内容所述,如果第一次嘗試Pull消息失敗(比如Broker端沒有可以消費的消息),則通過長輪詢機制先hold住并且挂起該請求,然後通過Broker端的背景線程PullRequestHoldService重新嘗試和背景線程ReputMessageService的二次處理。

消費異常重試

簡談RabbitMQ的手動消息确認ACK機制

RabbitMQ提供了消息确認機制。消費者在訂閱隊列時,可以在代碼中手動設定autoAck參數為false,這時RabbitMQ會等待消費者顯式地回複确認信号(即為顯式地調用channel.basicAck(envelope.getDeliveryTag(), false)方法)後才從叢集中的記憶體(或磁盤)節點上移除消息,進而保證了這條消息不會因為消費失敗而導緻丢失。

簡析Kafka消息消費的手動送出

在Kafka中,也可以采用上面那種的消費後的确認機制,通過在Consumer端設定“enable.auto.commit”屬性為false後,待業務工程正常處理完消費後,在代碼中手動調用KafkaConsumer執行個體的commitSync()方法送出(ps:這裡指的是同步阻塞commit消費的偏移量,等待Broker端的傳回響應,需要注意Broker端在對commit請求做出響應之前,消費端會處于阻塞狀态,進而限制消息的處理性能和整體吞吐量),以確定消息能夠正常被消費。如果在消費過程中,消費端突然Crash,這時候消費偏移量沒有commit,等正常恢複後依然還會處理剛剛未commit的消息。

RocketMQ消費失敗後的消費重試機制

重試隊列

如果Consumer端因為各種類型異常導緻本次消費失敗,為防止該消息丢失而需要将其重新回發給Broker端儲存,儲存這種因為異常無法正常消費而回發給MQ的消息隊列稱之為重試隊列。RocketMQ會為每個消費組都設定一個Topic名稱為“%RETRY%+consumerGroup”的重試隊列(這裡需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設定的),用于暫時儲存因為各種異常而導緻Consumer端無法消費的消息。考慮到異常恢複起來需要一些時間,會為重試隊列設定多個重試級别,每個重試級别都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ對于重試消息的處理是先儲存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中,背景定時任務按照對應的時間進行Delay後重新儲存至“%RETRY%+consumerGroup”的重試隊列中(具體細節後面會詳細闡述)。

死信隊列

由于有些原因導緻Consumer端長時間的無法正常消費從Broker端Pull過來的業務消息,為了確定消息不會被無故的丢棄,那麼超過配置的“最大重試消費次數”後就會移入到這個死信隊列中。在RocketMQ中,SubscriptionGroupConfig配置常量預設地設定了兩個參數,一個是retryQueueNums為1(重試隊列數量為1個),另外一個是retryMaxTimes為16(最大重試消費的次數為16次)。Broker端通過校驗判斷,如果超過了最大重試消費次數則會将消息移至這裡所說的死信隊列。這裡,RocketMQ會為每個消費組都設定一個Topic命名為“%DLQ%+consumerGroup"的死信隊列。一般在實際應用中,移入至死信隊列的消息,需要人工幹預處理。

Consumer端回發消息至Broker端

在業務工程中的Consumer端(Push消費模式下),如果消息能夠正常消費需要在注冊的消息監聽回調方法中傳回CONSUME_SUCCESS的消費狀态,否則因為各類異常消費失敗則傳回RECONSUME_LATER的消費狀态。

如果業務工程對消息消費失敗了,那麼則會抛出異常并且傳回這裡的RECONSUME_LATER狀态。這裡,在消費消息的服務線程—consumeMessageService中,将封裝好的消息消費任務ConsumeRequest送出至線程池—consumeExecutor異步執行。從消息消費任務ConsumeRequest的run()方法中會執行業務工程中注冊的消息監聽回調方法,并在processConsumeResult方法中根據業務工程傳回的狀态(CONSUME_SUCCESS或者RECONSUME_LATER)進行判斷和做對應的處理(下面講的都是在消費通信模式為叢集模型下的,廣播模型下的比較簡單就不再分析了)。

業務方正常消費(CONSUME_SUCCESS)

正常情況下,設定ackIndex的值為consumeRequest.getMsgs().size() - 1,是以後面的周遊consumeRequest.getMsgs()消息集合條件不成立,不會調用回發消費失敗消息至Broker端的方法—sendMessageBack(msg, context)。最後,更新消費的偏移量。

業務方消費失敗(RECONSUME_LATER)

異常情況下,設定ackIndex的值為-1,這時就會進入到周遊consumeRequest.getMsgs()消息集合的for循環中,執行回發消息的方法—sendMessageBack(msg, context)。這裡,首先會根據brokerName得到Broker端的位址資訊,然後通過網絡通信的Remoting子產品發送RPC請求到指定的Broker上,如果上述過程失敗,則建立一條新的消息重新發送給Broker,此時新消息的Topic為“%RETRY%+ConsumeGroupName”—重試隊列的主題。其中,在MQClientAPIImpl執行個體的consumerSendMessageBack()方法中封裝了ConsumerSendMsgBackRequestHeader的請求體,随後完成回發消費失敗消息的RPC通信請求(業務請求碼為:CONSUMER_SEND_MSG_BACK)。倘若上面的回發消息流程失敗,則會延遲5S後重新在Consumer端進行重新消費。與正常消費的情況一樣,在最後更新消費的偏移量。

Broker端對于回發消息處理的主要流程

Broker端收到這條Consumer端回發過來的消息後,通過業務請求碼(CONSUMER_SEND_MSG_BACK)比對業務處理器—SendMessageProcessor來處理。在完成一系列的前置校驗(這裡主要是“消費分組是否存在”、“檢查Broker是否有寫入權限”、“檢查重試隊列數是否大于0”等)後,嘗試擷取重試隊列的TopicConfig對象(如果是第一次無法擷取到,則調用createTopicInSendMessageBackMethod()方法進行建立)。根據回發過來的消息偏移量嘗試從commitlog日志檔案中查詢消息内容,若不存在則傳回異常錯誤。

然後,設定重試隊列的Topic—“%RETRY%+consumerGroup”至MessageExt的擴充屬性“RETRY_TOPIC”中,并對根據延遲級别delayLevel和最大重試消費次數maxReconsumeTimes進行判斷,如果超過最大重試消費次數(預設16次),則會建立死信隊列的TopicConfig對象(用于後面将回發過來的消息移入死信隊列)。在建構完成需要落盤的MessageExtBrokerInner對象後,調用“commitLog.putMessage(msg)”方法做消息持久化。這裡,需要注意的是,在putMessage(msg)的方法裡會使用“SCHEDULE_TOPIC_XXXX”和對應的延遲級别隊列Id分别替換MessageExtBrokerInner對象的Topic和QueueId屬性值,并将原來設定的重試隊列主題(“%RETRY%+consumerGroup”)的Topic和QueueId屬性值做一個備份分别存入擴充屬性properties的“REAL_TOPIC”和“REAL_QID”屬性中。看到這裡也就大緻明白了,回發給Broker端的消費失敗的消息并非直接儲存至重試隊列中,而是會先存至Topic為“SCHEDULE_TOPIC_XXXX”的定時延遲隊列中。

RocketMQ的重試隊列的Topic是“%RETRY%+consumerGroup”,為啥這裡要儲存至Topic是“SCHEDULE_TOPIC_XXXX”的這個延遲隊列中呢?

在源碼中搜尋下關鍵字—“SCHEDULE_TOPIC_XXXX”,會發現Broker端還存在着一個背景服務線程—ScheduleMessageService(通過消息存儲服務—DefaultMessageStore啟動),通過檢視源碼可以知道其中有一個DeliverDelayedMessageTimerTask定時任務線程會根據Topic(“SCHEDULE_TOPIC_XXXX”)與QueueId,先查到邏輯消費隊列ConsumeQueue,然後根據偏移量,找到ConsumeQueue中的記憶體映射對象,從commitlog日志中找到消息對象MessageExt,并做一個消息體的轉換(messageTimeup()方法,由定時延遲隊列消息轉化為重試隊列的消息),再次做持久化落盤,這時候才會真正的儲存至重試隊列中。看到這裡就可以解釋上面的疑問了,定時延遲隊列隻是為了用于暫存的,然後延遲一段時間再将消息移入至重試隊列中。

Consumer端消費重試機制

每個Consumer執行個體在啟動的時候就預設訂閱了該消費組的重試隊列主題,DefaultMQPushConsumerImpl的copySubscription()方法中的相關代碼如下

private void copySubscription() throws MQClientException {
            //省略其他代碼...
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING://如果消息消費模式為叢集模式,還需要為該消費組對應一個重試主題
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
            //省略其他代碼...
      }      

繼續閱讀