RocketMQ-NameServer:
一、作用
作為整個MQ的核心,主要負責服務發現,提供Broker節點的增删改查,提供給生産者和消費者;
多台NameServer之間不會互相通信,是以有可能在某一時刻,資料并不完全相同,但是不影響消息的發送;
二、啟動
主要是兩個配置
NameServerConfig(有用參數):
rocketmqhome:rocketmq主目錄,可以通過-Drocketmq.home.dir=path或通過設定環境變量ROCKETMQ_HOME來配置RocketMQ 的主目錄。
kvConfigPath: NameServer存儲KV配置屬性的持久化路徑。
configStorePath:nameServer預設配置檔案路徑,不生效。nameServer啟動時如果要通過配置檔案配置NameServer 啟動屬性的話,請使用-c選項。
orderMessageEnable:是否支援順序消息,預設是不支援。
NettyServerConfig(有用參數):
listenPort: NameServer監昕端口
serverWorkerThreads: Netty業務線程池線程個數。
serverCallbackExecutorThreads : Netty的public任務線程池線程個數,Netty網絡設計,根據業務類型會建立不同的線程池,比如處理消息發送、消息消費、心跳檢測等。如果該業務類型(RequestCode)未注冊線程池,則由public 線程池執行。
serverSelectorThreads: IO 線程池線程個數,主要是NameServer、Broker端解析請求、傳回相應的線程個數,這類線程主要是處理網絡請求的,解析請求包,然後轉發到各個業務線程池完成具體的業務操作,然後将結果再傳回調用方。
serverOnewaySemaphoreValue:send oneway消息請求井發度(Broker端參數)。
serverAsyncSemaphoreValue:異步消息發送最大并發度(Broker端參數)。
serverChannelMaxldleTimeSeconds:網絡連接配接最大空閑時間,預設120s 。如果連接配接
空閑時間超過該參數設定的值,連接配接将被關閉。
serverSocketSndBufSize:網絡socket發送緩存區大小,預設64k 。
serverSocketRcvBufSize:網絡socket接收緩存區大小,預設64k 。
serverPooledByteBufAllocatorEnable: ByteBuffer是否開啟緩存,建議開啟。
useEpollNativeSelector:是否啟用EpollIO模型,Linux環境建議開啟。
在啟動的時候會建立NettyServer網絡處理對象,開啟檢測停用Broker的定時任務和列印KV配置的任務,然後向JVM注冊一個鈎子函數用來優雅關閉線程池,停機;
三、路由管理
路由存儲
RoutelnfoManager儲存路由資訊:
1.Topic消息隊列路由資訊,消息發送時根據路由表進行負載均衡。
2.Broker 基礎資訊,包含brokerName、所屬叢集名稱、主備Broker位址。
3.Broker 叢集資訊,存儲叢集中所有Broker 名稱。
4.Broker 狀态資訊。NameServer 每次收到心跳包時會替換該資訊。
5.Broker 上的FilterServer 清單,用于類模式消息過濾
PS:一個Topic有多個消息隊列,一個Broker為每一主題預設建立4個讀隊列和4個寫隊列;多個Name相同的Broker組成叢集,BrokerID為0代表Master,BrokerID為1代表Salve;
路由注冊
路由注冊是通過Broker與NameServer的心跳功能實作的,Broker啟動的時候會向所有的NameServer發送心跳語句并每隔30S發一次心跳;
NameSever每10秒去管理一次存儲活躍Broker的,如果連續120S沒有收到心跳,則移除該Broker
網絡傳輸基于Netty,每個請求都有專門的RequestCode;
Broker中Topic預設存儲在${RocketHome}/store/confg/topic.json中。
PS:NameServer與Broker保持長連接配接,Broker的狀态存儲在brokerLiveTable中,每收到一個心跳包,就更新table中關于Broker的狀态資訊以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable),在更新的時候使用了所力度較少的讀寫鎖,允許多個消息發送者并發讀,保證了說個消息發送者并發讀,保證了消息發送時的高并發,但是處理心跳包的時候是串行執行的
路由删除
NameServer定時掃描brokerliveTable檢測上次心跳包與目前系統時間的時間差,如果大于120S,則移除Broker資訊
Broker正常關閉的時候,執行UnregisterBroker指令;
路由發現
RoketMQ路由發現是非實時的,黨Topic路由出現變化之後,NameServer不主動推送給用戶端,而是有用戶端定時根據主題名稱拉取主題最新的路由;
調用RouterlnfoManager的方法,從路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData中的List<Queu巳Data>、List<BrokerData>和filterServer 位址表。
如果找到主題對應的路由資訊并且該主題為順序消息,則從NameServerKVconfig中擷取關于順序消息相關的配置填充路由資訊。
如果找不到路由資訊CODE,則使用TOPIC NOT_EXISTS ,表示沒有找到對應的路由
四、消息發送
發送方式:
同步(sync):發送消息執行API時,同步等待直到消息伺服器傳回發送結果;
異步(async):發送消息執行API時,指定回調函數,立即傳回,目前線程不阻塞,直到運作結束,消息發送成功的回調任務在一個新的線程中執行;
單向(oneway):發送消息執行API時,直接傳回,不在乎消息是否成功存儲到消息伺服器上
消息體:
1.消息主題:topic
2.消息Flag(RocketMQ不做處理)
3.擴充屬性存儲在Message的properties中:
tag:消息TAG,用于消息過濾。
keys:Message索引鍵,多個用空格隔開,RocketMQ 可以根據這些key快速檢索到消息。
waitStoreMsgOK:消息發送時是否等消息存儲完成後再傳回。
delayTimeLevel:消息延遲級别,用于定時消息或消息重試。
4.消息體:byte數組
五、生産者:
DefaultMQProducer是預設的消息生産者實作類,它實作MQAdmin的接口;
核心屬性:
producerGroup:生産者所屬組,消息伺服器在回查事務狀态時會随機選擇該組中任何一個生産者發起事務回查請求。
createTopicKey:預設topicKey。
defaultTopicQu巳ueNums:預設主題在每一個Broker隊列數量。
sendMsgTimeout:發送消息預設逾時時間,預設3s。
compressMsgBodyOverHowmuch:消息體超過該值則啟用壓縮,預設4K。
retryTimesWhenSendFailed:同步方式發送消息重試次數,預設為2,總共執行3次。
retryTimesWhens巳ndAsyncFailed:異步方式發送消息重試次數,預設為2。
retryAnotherBrokerWhenNotStoreOK:消息重試時選擇另外一個Broker時是否不等待存儲結果就傳回,預設為false。
maxMessageSize:允許發送的最大消息長度,預設為4M,眩值最大值為2"32-1。
六、消息發送流程:
1.驗證消息
2.查找主題路由資訊
TopicPublishinfo:
orderTopic:是否是順序消息。
List<MessageQueue>messageQueueList:該主題隊列的消息隊列。
sendWhichQueue:每選擇一次消息隊列,該值會自增l,如果Integer.MAX_VALUE,則重置為0,用于選擇消息隊列。
List<QueueData>queueData:topic隊列中繼資料。
List<BrokerData>brokerDatas:topic分布的broker中繼資料。
HashMap<String>:broker上過濾伺服器
位址清單。
待續........