1 案例引入
- 官方Consumer最簡代碼用例:
-
Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結 - 簡短的代碼,背後牽涉很多問題,Consumer如何綁定特定分區?如何實作訂閱 topic 的?又如何實作拉消息?
2 訂閱流程
訂閱主流程主要更新如下關鍵屬性:
訂閱狀态(SubscriptionState) - subscriptions
主要維護所訂閱的topic和patition的消費位置等狀态資訊
中繼資料中的topic資訊
metadata中維護了Kafka叢集中繼資料的一個子集,包括叢集的Broker節點、Topic和Partition在節點上分布,以及我們聚焦的第二個問題:Coordinator給Consumer配置設定的Partition資訊。
注意acquireAndEnsureOpen()和try-finally release()保證該方法的線程安全。
跟進到更新中繼資料的方法metadata.requestUpdateForNewTopics()
Metadata.requestUpdateForNewTopics()
- 請求更新中繼資料。
-
Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結 - 這裡,并未真正發送更新中繼資料的請求,隻是将需要更新中繼資料的标志位needUpdate置true。Kafka必須確定在第一次拉消息前中繼資料可用,即必須更新一次中繼資料,否則Consumer不知道應該去哪個Broker拉哪個Partition的消息。
3 拉消息流程
那中繼資料何時才真正更新呢?
- 拉消息時序圖
-
Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結 - KafkaConsumer#poll()方法中主要調用如下方法:
updateAssignmentMetadataIfNeeded()
- 更新中繼資料
-
Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結 - 其内部調用
,coordinator.poll()
裡又調用poll()
ConsumerNetworkClient#ensureFreshMetadata()
ConsumerNetworkClient#ensureFreshMetadata()
ConsumerNetworkClient#awaitMetadataUpdate
ConsumerNetworkClient#awaitMetadataUpdate
内部調用了client.poll()方法,實作與Cluster通信,在Coordinator注冊Consumer并拉取和更新中繼資料。
這些都是 client 類中方法,ConsumerNetworkClient封裝了Consumer和Cluster之間所有網絡通信的實作,是個完全的異步實作類。沒有維護任何線程
待發送Request都存在unsent域
Response存放在pendingCompletion域
每次調用poll()時,在目前線程中發送所有待發送Request,處理所有收到Response。
異步設計
- 優勢:
- 無需維護用于異步發送的和處理響應的線程,并且能充分發揮批量處理的優勢,這也是Kafka的性能非常好的原因之一。
很少的線程實作高吞吐量。劣勢:極大增加了代碼的複雜度。
好了,下面再看另一關鍵方法:
pollForFetches()
主要由fetcher.sendFetches()實作,由于代碼過長,簡述其流程如下:
根據中繼資料的資訊,構造所需Broker的拉消息的Request對象
然後調用ConsumerNetworkClient#send異步發送Request
并且注冊一個回調類處理傳回的Response
所有傳回的Response被暫時存放在Fetcher#completedFetches。注意,此時的Request并未被真正發給各Broker,而被暫存在client.unsend等待發送。
然後,在調用ConsumerNetworkClient#poll時,會真正将之前構造的所有Request發送出去,并處理收到的Response
最後,fetcher.fetchedRecords()方法中,将傳回的Response反序列化後轉換為消息清單,傳回給調用者
總結
綜上過程講解,給出整個拉消息流程涉及關鍵類的類圖
參考