RocketMQ 發送普通消息有3種實作方式
1.可靠同步發送
2.可靠異步發送
3.單向發送

Producer 啟動流程
1.設定生産者組producerGroup
消息發送DefaulMQProducerImpl
一、start方法
對于start方法
1. 狀态是CREATE_JUST,代表剛剛建立,沒有運作
1.1 檢查配置config
1.2 建立mqClientFactory
1.3 注冊producer,也就是放入到本地緩存producerTable中
1.4 然後設定producer的主題topicKey,啟動mQClientFactory,把狀态設定為Running,正在運作
1.5 如果目前狀态是shutDown,就直接抛出異常
++++++++++++++++++++++++++++++++++++++++++++++++++++++
發送消息實作 send方法
1.使用線程池executor, 調用sendDefaultImpl實作
2. 發送消息到mq
2.1:先從緩存中擷取topic的路由資訊,如果有就直接傳回。調用tryToFindTopicPublishInfo
使用預設路由,或者使用參數topic擷取路由;擷取路由後,和本地緩存對比,發生變化需要更新路由
2.2:根據mq的brokerName擷取broker,根據topic路由算法選擇一個MessageQueue。如下圖
2.3:向 MessageQueue 發送消息,調用sendKernelImpl,分成單個發送,批量發送
建構不同的request;根據messageQueue 擷取broker的網絡位址,如果brokerAddrTable未緩存該broker的資訊,則NameServer需要主動更新一下topic的路由資訊。
發送模式分成ONEWAY,ASYNC, SYNC; 為消息配置設定全局唯一ID
執行鈎子函數hook
msgCheck: 檢查broker是否有寫權限,檢查隊列是否合法,是否超過最大重試次數。
2.4:如果是同步調用方式(SYNC),則執行失敗重試政策,預設重試兩次。
底層調用MQClientAPIImpl#sendMessage
2.5 如果是Async 異步,判斷mq是否壓縮
2.5.1 先校驗發送消息沒有timeout逾時
2.5.2 remotingClient.invokeAsync,異步調用并且使用回調函數callback
2.5.3 根據addr擷取channel,檢查channel不能為null,必須active狀态
2.5.4 檢查異步調用是否逾時,逾時就報出 “invokeAsync call timeout”
2.5.5 通過調用channel.writeAndFlush發送請求,并且使用監聽器ChannelFutureListener
回調函數為operationComplete,根據success和failure處理responseFuture
2.5.6 調用closeChannel 關閉channel
2.5.7 發送請求指令完畢後,會繼續調用回調函數operationComplete
2.5.8 處理sendResponse
2.6 如果是Sync同步發送消息,過程和上面類似,使用channel同步調用,最後擷取應答response
2.6.1 處理結果response
2.7 發送消息執行後,執行executeSendMessageHookAfter回調鈎子方法
批量發送,處理流程和單條發送大體一緻
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
根據本次消息發送耗時,更新失敗政策