天天看點

Rebalance&多線程執行個體消費(十二)

上篇文章說了,kafka位移送出通過enable.auto.commit控制手動送出還是自動送出,手動送出又分為異步送出和同步送出,還可以指定分區進行送出,預設是送出給所有分區。手動送出可以對應不同的業務場景,當需要業務全部處理完才送出位移,則可以選擇手動送出,但這時候需要做幂等性處理,因為當業務執行完畢,但系統當機,這時候consumer重新開機則因為位移沒送出會重複消費之前的資料。

一、Rebalance

Rebalance是什麼?

它本質是一組協定,規定了consumer group如何達成一緻性來配置設定訂閱所有分區的。假設有20個consumer,需要訂閱100個分區的topic,這時候就會每個consumer會平均訂閱5個分區,這個過程就是rebalace。

和舊版本依托于zookeeper不同,新版本consumer使用了kafka内置一個權限的協調協定(group coordination protocol)。Kafka的某個broker會被選舉為組協調者(group coordinator),他負責對組的狀态進行管理,他的主要職責是當新成員到達時促進組内所有的成員重新配置設定,即coordinator負責rebalance。

什麼時候他會觸發rebalance呢?

  1. 組成員發生變化,比如新的consumer加入組,或者有consumer離開組,或者consumer崩潰時候觸發。
  2. 消費組訂閱的topic發生變化。
  3. 組訂閱的topic分區發生變更。

真實應用場景中引用rebalance最常見原因違背了第一條件,特别是consumer崩潰情況,崩潰不一定是consumer程序當機或者挂掉,當consumer無法在指定時間内完成消息處理時候,那麼coordinator則會認為consumer已經崩潰,進而引發新一輪的rebalance。當group程式下業務處理邏輯過重,這時候就會導緻消費逾時,進而導緻coordinator認為consumer挂掉,引發rebalance,這時候就要注意這些參數的配置request.timeout.ms、max.poll.interval.ms、max.poll.records等。

Rebalance分區配置?

之前提到過rebalance時group下所有consumer會一起協調共同參與分區配置設定,kafka新版本consumer預設提供了三種分區政策,分别是range、round-robin、sticky。

Range政策主要是基于範圍思想,它将單個topic的所有分區按照順序排列,然後把這些分區劃分為固定大小的分區并且依次分給各個consumer。而round-robin政策則會把所有topic的所有分區順序擺開,然後輪詢式的配置設定給各個consumer。最新釋出的sticky政策有效避免上訴兩種政策完全無視曆史配置設定方案缺陷,采用“有粘性”對所有consumer執行個體進行配置設定,可以最大程度的避免配置設定傾斜。

新版本consumer預設的配置設定政策是range,使用者根據consumer參數partition.assignment.strategy來進行設定,另外也可以通過自定義來配置設定政策。

Rebalance協定:

前面說了rebalance本質就是一組協定,group與coordinator共同使用這組協定來完成group的rebalance,最新版本的kafka中提供下面五種協定來處理rebalance。

Joingroup請求:consumer請求加入組。

SyncGroup請求:group leader吧配置設定方案同步更新到組内所有成員中。

Heartbeat請求:consumer定期向coordinator彙報心跳表明依然存活。

LeaveGroup請求:consumer主動通知coordinator該consume即将離組。

DescribeGroup請求:檢視組的所有資訊,包括成員資訊,協定資訊,配置設定方案,訂閱資訊。該請求類型主要提供管理者使用。Coordinator不使用該請求執行rebalance。

在rebalance過程中,coordinator主要處理consumer發過來的joinGroup和syncGroup請求,當consumer主動離組時會發送leaveGroup請求給coordinator。

在成功rebalance後,組内所有consumer都需要定期向coordinator發送heartbeat請求,而每個consumer也是根據heartBeat請求的響應中是否包含rebalance_in_progress來判斷目前group是否開啟了新一輪的rebalance。

rebalance監聽器:

在位移送出章節中,consumer預設在新版本是把位移送出到_consumer_offsets中。其實kafka也支援把位移送出到外部存儲中,比如資料庫。若要實作這個功能,則必須使用rebalance監聽器,而使用監聽器的前提是使用者必須使用consumer group。如果使用獨立的consumer或者直接手動配置設定分區,那麼rebalance監聽是無效的。

多線程執行個體消費

如前所述,kafkaConsumer是非線程安全的,他和kafkaProducer不同,後者是線程安全的,是以可以在多個線程中使用同一個kafkaProducer執行個體,而且這樣的效率是比每個線程維護一個kafkaProducer更高。

Consumer group分為 每個線程單獨維護一個kafkaConsumer,和 單kafkaConsumer+多work線程。

兩者差別是,後者在全局維護一個或者多個kafkaConsumer執行個體執行消息擷取任務。使用全局的kafkaConsumer執行個體執行消息擷取,然後把擷取到的消息集合交給線程池中的work線程執行工作,之後work線程完成處理上報位移狀态,由全局的consumer送出位移。

那麼他們的優缺點呢?

每個線程維護專屬consumer:優點:實作簡單,速度快,因為無線程之間的互動管理,友善管理位移,易于維護分區間的消費順序。缺點:socker連接配接開銷大;consumer受限與topic分區,擴充性差。Broker端處理負載高(因為發往broker請求多);rebalance可能性大。

單consumer+多worker模式:優點:消息擷取處了解耦;擴充性強,獨立擴充consumer數量和worker。缺點:實作負載;難以維護分區内的順序消息;處理鍊路變長,導緻位移管理困難;worker線程異常導緻資料丢失。

獨立consumer

  1. 如果程序自己維護分區狀态,那麼它就可以固定消費某些分區而不用擔心狀态丢失問題。
  2. 如果程序本身已經是高可用且能夠自動重新開機恢複錯誤,那麼它就不需要讓kafka來幫它完成錯誤檢測和狀态恢複。

繼續閱讀