天天看點

RocketMQ 源碼分析04消息發送

RocketMQ 發送普通消息有3種實作方式

1.可靠同步發送

2.可靠異步發送

3.單向發送

RocketMQ 源碼分析04消息發送

Producer 啟動流程

RocketMQ 源碼分析04消息發送

1.設定生産者組producerGroup

消息發送DefaulMQProducerImpl

一、start方法
           
RocketMQ 源碼分析04消息發送

對于start方法

1. 狀态是CREATE_JUST,代表剛剛建立,沒有運作

   1.1 檢查配置config

   1.2 建立mqClientFactory

   1.3 注冊producer,也就是放入到本地緩存producerTable中

   1.4 然後設定producer的主題topicKey,啟動mQClientFactory,把狀态設定為Running,正在運作

   1.5 如果目前狀态是shutDown,就直接抛出異常

++++++++++++++++++++++++++++++++++++++++++++++++++++++

發送消息實作 send方法

RocketMQ 源碼分析04消息發送

1.使用線程池executor,  調用sendDefaultImpl實作

2. 發送消息到mq

   2.1:先從緩存中擷取topic的路由資訊,如果有就直接傳回。調用tryToFindTopicPublishInfo

            使用預設路由,或者使用參數topic擷取路由;擷取路由後,和本地緩存對比,發生變化需要更新路由

RocketMQ 源碼分析04消息發送

   2.2:根據mq的brokerName擷取broker,根據topic路由算法選擇一個MessageQueue。如下圖

RocketMQ 源碼分析04消息發送

   2.3:向 MessageQueue 發送消息,調用sendKernelImpl,分成單個發送,批量發送

            建構不同的request;根據messageQueue 擷取broker的網絡位址,如果brokerAddrTable未緩存該broker的資訊,則NameServer需要主動更新一下topic的路由資訊。

RocketMQ 源碼分析04消息發送

發送模式分成ONEWAY,ASYNC, SYNC; 為消息配置設定全局唯一ID

RocketMQ 源碼分析04消息發送

   執行鈎子函數hook

  msgCheck: 檢查broker是否有寫權限,檢查隊列是否合法,是否超過最大重試次數。

   2.4:如果是同步調用方式(SYNC),則執行失敗重試政策,預設重試兩次。

   底層調用MQClientAPIImpl#sendMessage

   2.5 如果是Async 異步,判斷mq是否壓縮

         2.5.1 先校驗發送消息沒有timeout逾時

         2.5.2 remotingClient.invokeAsync,異步調用并且使用回調函數callback

RocketMQ 源碼分析04消息發送

    2.5.3 根據addr擷取channel,檢查channel不能為null,必須active狀态

    2.5.4 檢查異步調用是否逾時,逾時就報出 “invokeAsync call timeout”

    2.5.5 通過調用channel.writeAndFlush發送請求,并且使用監聽器ChannelFutureListener 

           回調函數為operationComplete,根據success和failure處理responseFuture

   2.5.6 調用closeChannel 關閉channel

   2.5.7 發送請求指令完畢後,會繼續調用回調函數operationComplete

   2.5.8  處理sendResponse

2.6 如果是Sync同步發送消息,過程和上面類似,使用channel同步調用,最後擷取應答response

RocketMQ 源碼分析04消息發送

     2.6.1 處理結果response

2.7 發送消息執行後,執行executeSendMessageHookAfter回調鈎子方法

批量發送,處理流程和單條發送大體一緻     

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

RocketMQ 源碼分析04消息發送

   根據本次消息發送耗時,更新失敗政策

RocketMQ 源碼分析04消息發送
mq