天天看點

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

1 案例引入

  • 官方Consumer最簡代碼用例:
  • Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結
  • 簡短的代碼,背後牽涉很多問題,Consumer如何綁定特定分區?如何實作訂閱 topic 的?又如何實作拉消息?

2 訂閱流程

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

訂閱主流程主要更新如下關鍵屬性:

訂閱狀态(SubscriptionState) - subscriptions

主要維護所訂閱的topic和patition的消費位置等狀态資訊

中繼資料中的topic資訊

metadata中維護了Kafka叢集中繼資料的一個子集,包括叢集的Broker節點、Topic和Partition在節點上分布,以及我們聚焦的第二個問題:Coordinator給Consumer配置設定的Partition資訊。

注意acquireAndEnsureOpen()和try-finally release()保證該方法的線程安全。

跟進到更新中繼資料的方法metadata.requestUpdateForNewTopics()

Metadata.requestUpdateForNewTopics()

  • 請求更新中繼資料。
  • Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結
  • 這裡,并未真正發送更新中繼資料的請求,隻是将需要更新中繼資料的标志位needUpdate置true。Kafka必須確定在第一次拉消息前中繼資料可用,即必須更新一次中繼資料,否則Consumer不知道應該去哪個Broker拉哪個Partition的消息。

3 拉消息流程

那中繼資料何時才真正更新呢?

  • 拉消息時序圖
  • Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結
  • KafkaConsumer#poll()方法中主要調用如下方法:

updateAssignmentMetadataIfNeeded()

  • 更新中繼資料
  • Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結
  • 其内部調用

    coordinator.poll()

    poll()

    裡又調用

ConsumerNetworkClient#ensureFreshMetadata()

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

ConsumerNetworkClient#awaitMetadataUpdate

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

内部調用了client.poll()方法,實作與Cluster通信,在Coordinator注冊Consumer并拉取和更新中繼資料。

這些都是 client 類中方法,ConsumerNetworkClient封裝了Consumer和Cluster之間所有網絡通信的實作,是個完全的異步實作類。沒有維護任何線程

待發送Request都存在unsent域

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

Response存放在pendingCompletion域

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

每次調用poll()時,在目前線程中發送所有待發送Request,處理所有收到Response。

異步設計

  • 優勢:
  • 無需維護用于異步發送的和處理響應的線程,并且能充分發揮批量處理的優勢,這也是Kafka的性能非常好的原因之一。

很少的線程實作高吞吐量。劣勢:極大增加了代碼的複雜度。

好了,下面再看另一關鍵方法:

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

pollForFetches()

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

主要由fetcher.sendFetches()實作,由于代碼過長,簡述其流程如下:

根據中繼資料的資訊,構造所需Broker的拉消息的Request對象

然後調用ConsumerNetworkClient#send異步發送Request

并且注冊一個回調類處理傳回的Response

所有傳回的Response被暫時存放在Fetcher#completedFetches。注意,此時的Request并未被真正發給各Broker,而被暫存在client.unsend等待發送。

然後,在調用ConsumerNetworkClient#poll時,會真正将之前構造的所有Request發送出去,并處理收到的Response

最後,fetcher.fetchedRecords()方法中,将傳回的Response反序列化後轉換為消息清單,傳回給調用者

總結

綜上過程講解,給出整個拉消息流程涉及關鍵類的類圖

Kafka消費過程關鍵源碼解析1 案例引入2 訂閱流程3 拉消息流程總結

參考

繼續閱讀