天天看點

從RocketMQ的Broker源碼層面驗證一下這兩個點

本篇部落格會從源碼層面,驗證在RocketMQ基礎概念剖析,并分析一下Producer的底層源碼中提到的結論,分别是:
  • Broker在啟動時,會将自己注冊到所有的NameServer上
  • Broker在啟動之後,會每隔30S向NameServer發送心跳

之前的文章中,我們知道了RocketMQ中的一些核心概念,例如Broker、NameServer、Topic和Tag等等。Producer從啟動到發送消息的整個過程,從源碼級别分析了Producer在發送消息到Broker的時候,是如何拿到Broker的資料的,如何從多個MessageQueue中選擇對應的Queue發送消息。

但是由于篇幅原因,文章開頭提到的兩個已知結論在上篇部落格裡并沒沒有對其進行驗證,這次就從源碼層面來驗證一下。

一開頭就看到Broker主從架構相關的源碼

在上篇部落格中提到過,Broker為了保證自身的高可用,會采取一主一從的架構。即使Master Broker因為意外原因挂了,Slave Broker上還有一份完整的資料,Broker可以繼續提供服務。

從RocketMQ的Broker源碼層面驗證一下這兩個點

isEnableDLegerCommitLog

中提到的DLeger可以先不管,我們目前隻需要知道其預設傳回的結果是

false

。是以Broker首次啟動的時候,就會執行被If包裹住的邏輯。

RocketMQ本身是有主從架構的,但是功能不夠完善,如果Master Broker出現了故障,需要人工的将Slave Broker切換成Master。

就有點類似于手動的将一台Redis設定成另一台Redis的Slave節點,如果此時Redis的Master挂了,還需要手動的進行切換一樣。為了解決這個問題,Redis搞出了Sentinel,可以在發生故障的時候自動的實作故障轉移。是以RocketMQ在4.5版本之後推出的Dleger差不多也是這麼個東西,除此之外,Dleger還可以實作多副本。

不使用Dleger時,主從資料如何進行同步

先給出結論,在RocketMQ的主從架構下,主從同步采取的是Slave主動拉取的方式。

如果目前執行注冊的Broker角色是

Slave

,那就會使用

ScheduledExecutorService

啟動一個周期性的定時任務,每隔10秒就會去Master同步一次,同步的資料包括Topic的相關配置、Consumer的消費偏移量、延遲消息的Offset、訂閱組的相關資料和配置。

從RocketMQ的Broker源碼層面驗證一下這兩個點

ScheduledExecutorService

的作用和原理下面會做簡單介紹。

首次啟動時強制進行Broker注冊

從RocketMQ的Broker源碼層面驗證一下這兩個點

因為是首次啟動,是以參數

forceRegister

被直接設定成了true。

使用ScheduledExecutorService啟動定時任務

通過入口進來之後,Broker會啟動一個定時任務,周期性的去注冊。

ScheduledExecutorService

底層就是一個

newSingleThreadScheduledExecutor

,隻有一個線程的線程池,其關鍵的參數

corePoolSize

值為

1

,然後按照指定的頻率周期性的執行某個任務。

從RocketMQ的Broker源碼層面驗證一下這兩個點

ScheduledExecutorService主要的功能有兩個,分别是:

  • ScheduledExecutorService

    以固定的頻率執行任務
  • ScheduledExecutorService

    執行完之後,間隔制定的時間後再執行下一個任務

使用scheduleAtFixedRate實作心跳機制

此處我們使用的是

scheduleAtFixedRate

,如下圖。

從RocketMQ的Broker源碼層面驗證一下這兩個點

至于執行的頻率,我們能夠配置的範圍最大不能超過一分鐘,也就是說這個範圍是在10-60秒之間,預設30秒執行一次,這也就驗證了每30秒,Broker會向NameServer發送一次心跳。

擷取執行頻率的這個判斷有點意思,甚至看起來有那麼一絲絲簡潔,但是了解其具體可配置的時間範圍可能需要花點時間。在實際業務性代碼中,個人建議還是不要這麼寫,業務中代碼的可讀性和可維護性我認為是需要放在首位的。

值得注意的是,此處啟動心跳,給了一個10秒的延遲,因為在不使用Dleger的情況下,在之前的邏輯中已經執行過一次注冊了。如果不做延遲,那麼幾乎是同一個時間就會有兩次注冊操作,而這明顯是不符合預期的;同時

forceRegister

也從

true

變成了通過函數

isForceRegister

來進行擷取。

調用registerBrokerAll注冊

定時任務注冊完成之後,之後的每次觸發都會執行

registerBrokerAll

方法來執行注冊,你可能會有疑問,我目前不就是一個Broker嗎,怎麼名字有個字尾

All

?那是因為NameServer會有多個,Broker啟動的時候會将自己注冊到所有的NameServer上去。當然,口說無憑,我們繼續看下去。

繼續往裡走,如果目前滿足注冊條件,則會實際的執行注冊操作。那具體滿足什麼條件呢?由變量

forceRegister

和一個

needRegister

方法來決定,

forceRegister

預設是

true

,是以當第一執行這個邏輯的時候是一定會執行注冊操作的。

從RocketMQ的Broker源碼層面驗證一下這兩個點

通過對比資料版本判斷目前Broker是否需要進行注冊

感興趣的話,可以繼續跟随文章了解一下,

needRegister

是根據什麼來判斷是否需要注冊的。

首先,Broker一旦注冊到了NameServer之後,由于Producer不停的在寫入資料,Consumer也在不停的消費資料,Broker也可能因為故障導緻MessageQueue等關鍵路由資訊發生變動,NameServer中的資料和Broker中實際的資料就會不一緻,如果不及時更新,Producer拉取到的路由資料就可能有誤。

是以每次定時任務觸發的時候會去對比NameServer和Broker的資料,如果發現資料版本不一緻,Broker會重新進行注冊,将最新的資料更新到NameServer。說直白一點,就是做一個資料定時更新。以下紅框中的代碼就是資料對比的核心代碼。

從RocketMQ的Broker源碼層面驗證一下這兩個點

當Broker和所有的NameServer節點一一完成資料對比之後,就會進行結果判定,但凡有一個節點資料不一緻,都需要進行重新注冊,把最新的資料更新到NameServer,核心判斷邏輯同樣用紅框标出。

從RocketMQ的Broker源碼層面驗證一下這兩個點

至此,其實我們就已經完成了 Broker在啟動的時候會向所有NameServer進行注冊 的驗證。但是由于後續仍然有值得關注發光點,我們繼續後續的源碼閱讀。

使用CountDownLatch擷取所有注冊異步任務的傳回結果

除此之外,還值得注意的是在

needRegister

中,對于和多個NameServer的互動,RocketMQ是通過線程池異步實作的,同時使用了CountDownLatch來等待所有的請求結束,傳回結果給主線程。

從RocketMQ的Broker源碼層面驗證一下這兩個點

既然聊到了CountDownLatch,就順帶提一下。假設我們有5個互不依賴的計算任務,如果快速的計算出結果并傳回呢?那當然是5個任務并發執行,這就需要通過新開線程實作,結果就無法一起傳回了。

而CountDownLatch可以讓主線程等待,等待這5個計算任務全部結束之後,喚醒主線程再繼續後面的邏輯。這就是CountDownLatch的作用,如果平時隻是單純的CRUD功能的話,可能連CountDownLatch是什麼都做不知道,這也是為什麼大廠面試會問這些問題,因為在大廠的複雜業務背景下,你必須要會使用它們。

指定需要注冊之後,接下來就是核心的注冊方法了,核心邏輯由

registerBrokerAll

來實作。Broker同樣會去每一個NameServer節點上注冊自己,并且為了提前執行的效率,同樣開線程采用了異步的方式。在擷取所有結果時,同樣的使用了CountDownLatch。

從RocketMQ的Broker源碼層面驗證一下這兩個點

使用CopyOnWriteArrayList存儲注冊請求的傳回

除此之外,用于儲存注冊結果的清單,使用的是

CopyOnWriteArrayList

,被面試虐過的同學應該熟悉。我們知道此處開啟了多線程去不同的NameServer注冊,寫入注冊結果的時候,多線程對同一個清單進行寫入,會産生線程安全的問題。

從RocketMQ的Broker源碼層面驗證一下這兩個點

而我們知道

ArrayList

是非線程安全的,這也是為什麼此處要使用

CopyOnWriteArrayList

來儲存注冊結果。為什麼

CopyOnWriteArrayList

能夠保證線程安全?

這歸功于COW(Copy On Write),讀請求時共用同一個List,涉及到寫請求時,會複制出一個List,并在寫入資料的時候加入獨占鎖。比起直接對所有操作加鎖,讀寫鎖的形式分離了讀、寫請求,使其互不影響,隻對寫請求加鎖,降低了加鎖的消耗,提升了整體操作的并發。

上面并發執行的注冊操作,具體做了哪些事情呢?先看代碼。

從RocketMQ的Broker源碼層面驗證一下這兩個點

上面就是單個注冊的所有邏輯,可以看到在建構完請求之後,有一個

oneway

的判斷。

oneway

值為false,表示單向通信,Broker不關心NameServer的傳回,也不會觸發任何回調函數。接下來Broker就會把已經寫進request body的所有資料發送給NameServer。請求資料統一由一個叫

TopicConfigSerializeWrapper

的Wrapper給包裹住。

從RocketMQ的Broker源碼層面驗證一下這兩個點

其可以看為兩部分:

  • 存在該Broker節點上的所有Topic的資料
  • 資料版本

然後帶着這些資料,Broker會同步的調用

invokeSync

發送請求給NameServe,并且在執行之後觸發實作特定功能的回調函數。

EOF

至此,我們完成了對開篇所提結論的驗證,同時也發現了RocketMQ的主從架構、Master和Slave同步資料的方式、心跳機制的實作等等,也基本從源碼中看完了Broker啟動的所有流程。看這些老哥寫的源碼還是挺有意思的,之後有時間随緣再看看NameServer端相關的源碼吧。

好了以上就是本篇部落格的全部内容了,如果你覺得這篇文章對你有幫助,還麻煩點個贊,關個注,分個享,留個言。

歡迎微信搜尋關注【SH的全棧筆記】,檢視更多相關文章

從RocketMQ的Broker源碼層面驗證一下這兩個點

繼續閱讀