天天看點

Kafka突發rebalance

Kafka 是我們最常用的消息隊列,它那幾萬、甚至幾十萬的處理速度讓我們為之欣喜若狂。但是随着使用場景的增加,我們遇到的問題也越來越多,其中一個經常遇到的問題就是:rebalance(重平衡)問題。

什麼是消費組

要想了解 rebalance,那就得先了解消費組(consumer group)。

消費組指的是多個消費者(consumer)組成起來的一個組,它們共同消費 topic 的所有消息,并且一個 topic 的一個 partition 隻能被一個 consumer 消費。

Kafka 為消費者組定義了 5 種狀态,它們分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。

Kafka突發rebalance

了解了這些狀态的含義之後,我們來看一張圖檔,它展示了狀态機的各個狀态流轉。

Kafka突發rebalance

一個消費者組最開始是 Empty 狀态,當重平衡過程開啟後,它會被置于 PreparingRebalance 狀态等待成員加入,之後變更到 CompletingRebalance 狀态等待配置設定方案,最後流轉到 Stable 狀态完成重平衡。

當有新成員加入或已有成員退出時,消費者組的狀态從 Stable 直接跳到 PreparingRebalance 狀态,此時,所有現存成員就必須重新申請加入組。當所有成員都退出組後,消費者組狀态變更為 Empty。Kafka 定期自動删除過期位移的條件就是,組要處于 Empty 狀态。是以,如果你的消費者組停掉了很長時間(超過 7 天),那麼 Kafka 很可能就把該組的位移資料删除了。我相信,你在 Kafka 的日志中一定經常看到下面這個輸出:

Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.      

這就是 Kafka 在嘗試定期删除過期位移。現在你知道了,隻有 Empty 狀态下的組,才會執行過期位移删除的操作。

什麼是rebalance?

我們都知道 kafka 主要可以分為三大塊:生産者、kafka broker、消費者。

Kafka突發rebalance

而 kafka 怎麼均勻地配置設定某個 topic 下的所有 partition 到各個消費者,進而使得消息的消費速度達到最快,這就是平衡(balance)。而 rebalance(重平衡)其實就是重新進行 partition 的配置設定,進而使得 partition 的配置設定重新達到平衡狀态。

rebalance的流程

重平衡的完整流程需要消費者端和協調者元件共同參與才能完成。我們先從消費者的視角來審視一下重平衡的流程。

在消費者端,重平衡分為兩個步驟:分别是加入組和等待上司消費者(Leader Consumer)配置設定方案。這兩個步驟分别對應兩類特定的請求:JoinGroup 請求和 SyncGroup 請求。

JoinGroup請求

當組内成員加入組時,它會向協調者發送 JoinGroup 請求。在該請求中,每個成員都要将自己訂閱的主題上報,這樣協調者就能收集到所有成員的訂閱資訊。一旦收集了全部成員的 JoinGroup 請求後,協調者會從這些成員中選擇一個擔任這個消費者組的上司者。

通常情況下,第一個發送 JoinGroup 請求的成員自動成為上司者。你一定要注意區分這裡的上司者和之前我們介紹的上司者副本,它們不是一個概念。這裡的上司者是具體的消費者執行個體,它既不是副本,也不是協調者。這裡的上司者指的是消費組(consumer group)的上司者,消費組上司者的任務是收集所有成員的訂閱資訊,然後根據這些資訊,制定具體的分區消費配置設定方案。

選出上司者之後,協調者會把消費者組訂閱資訊封裝進 JoinGroup 請求的響應體中,然後發給上司者,由上司者統一做出配置設定方案後,進入到下一步:發送 SyncGroup 請求。

SyncGroup請求

在這一步中,上司者向協調者發送 SyncGroup 請求,将剛剛做出的配置設定方案發給協調者。值得注意的是,其他成員也會向協調者發送 SyncGroup 請求,隻不過請求體中并沒有實際的内容。這一步的主要目的是讓協調者接收配置設定方案,然後統一以 SyncGroup 響應的方式分發給所有成員,這樣組内所有成員就都知道自己該消費哪些分區了。

接下來,我用一張圖來形象地說明一下 JoinGroup 請求的處理過程。

Kafka突發rebalance

就像前面說的,JoinGroup 請求的主要作用是将組成員訂閱資訊發送給上司者消費者,待上司者制定好配置設定方案後,重平衡流程進入到 SyncGroup 請求階段。

下面這張圖描述的是 SyncGroup 請求的處理流程。

Kafka突發rebalance

SyncGroup 請求的主要目的,就是讓協調者把上司者制定的配置設定方案下發給各個組内成員。當所有成員都成功接收到配置設定方案後,消費者組進入到 Stable 狀态,即開始正常的消費工作。

什麼時候會發生rebalance?

前面我們已經說到,rebalance 其實就是對 partition 進行重新配置設定。那麼什麼時候會發生 rebalance 呢?其實在以下三種情況下,會觸發 rebalance:

  • 訂閱 Topic 的分區數發生變化。
  • 訂閱的 Topic 個數發生變化。
  • 消費組内成員個數發生變化。例如有新的 consumer 執行個體加入該消費組或者離開組。

訂閱Topic的分區數發生變化

簡單地說,就是之前 topic 有 10 個分區,現在變成了 20 個,那麼多出來的 10 個分區的資料就沒人消費了。那麼此時就需要進行重平衡,将新增的 10 個分區分給消費組内的消費者進行消費。是以在這個情況下,會發生重平衡。

訂閱的Topic個數發生變化

簡單地說,一個 consumer group 如果之前隻訂閱了 A topic,那麼其組内的 consumer 知會消費 A topic 的消息。而如果現在新增訂閱了 B topic,那麼 kafka 就需要把 B topic 的 partition 配置設定給組内的 consumer 進行消費。這個配置設定的過程,其實也是一個 rebalance 的過程。

消費組内成員個數發生變化

我們都知道 kafka 中是以消費組(consumer group)的方式進行消費的,消費組内的消費者共同消費一個 topic 下的消息。而當消費組内成員個數發生變化,例如某個 consumer 離開,或者新 consumer 加入,都會導緻消費組内成員個數發生變化,進而導緻重平衡。

相比起之前的兩個情況,這種情況在實際情況中更加常見。因為訂閱分區數、以及訂閱 topic 數都是我們主動改變才會發生,而組内消費組成員個數發生變化,則是更加随機的。

下面我們一起分析一下「消費組内成員個數發生變化」的幾種情況:

  • 新成員加入
  • 組成員主動離開
  • 組成員崩潰

新成員加入

新成員入組是指組處于 Stable 狀态後,有新成員加入。如果是全新啟動一個消費者組,Kafka 是有一些自己的小優化的,流程上會有些許的不同。我們這裡讨論的是,組穩定了之後有新成員加入的情形。

當協調者收到新的 JoinGroup 請求後,它會通過心跳請求響應的方式通知組内現有的所有成員,強制它們開啟新一輪的重平衡。具體的過程和之前的用戶端重平衡流程是一樣的。現在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。

Kafka突發rebalance

組成員主動離開

何謂主動離組?就是指消費者執行個體所線上程或程序調用 close() 方法主動通知協調者它要退出。這個場景就涉及到了第三類請求:LeaveGroup 請求。協調者收到 LeaveGroup 請求後,依然會以心跳響應的方式通知其他成員,是以我就不再贅述了,還是直接用一張圖來說明。

Kafka突發rebalance

組成員崩潰

崩潰離組是指消費者執行個體出現嚴重故障,突然當機導緻的離組。它和主動離組是有差別的,因為後者是主動發起的離組,協調者能馬上感覺并處理。但崩潰離組是被動的,協調者通常需要等待一段時間才能感覺到,這段時間一般是由消費者端參數 session.timeout.ms 控制的。

也就是說,Kafka 一般不會超過 session.timeout.ms 就能感覺到這個崩潰。當然,後面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。

Kafka突發rebalance

疑惑

在許多文章中,它們會加多了一個 rebalance 場景,即:「重平衡時協調者對組内成員送出位移的處理」。其實這個要說是 rebalance 場景,有點牽強。我們先來了解下這個場景究竟是什麼情況。

正常情況下,每個組内成員都會定期彙報位移給協調者。當重平衡開啟時,協調者會給予成員一段緩沖時間,要求每個成員必須在這段時間内快速地上報自己的位移資訊,然後再開啟正常的 JoinGroup/SyncGroup 請求發送。還是老辦法,我們使用一張圖來說明。

Kafka突發rebalance

是以這種場景是指 rebalance 發生之時,留有時間給消費者送出 offset,并不是引起 rebalance 的觸發原因(并不是因為送出 offset 引發 rebalance)。是以在我這篇文章裡,我并沒有将其作為 rebalance 的一種場景。

rebalance問題處理思路

前面我們講過 rebalance 一般會有 3 種情況,分别是:

  • 新成員加入
  • 組成員主動離開
  • 組成員崩潰

對于「新成員加入」、「組成員主動離開」都是我們主動觸發的,能比較好地控制。但是「組成員崩潰」則是我們預料不到的,遇到問題的時候也比較不好排查。但對于「組成員崩潰」也是有一些通用的排查思路的,下面我們就來聊聊「rebalance問題的處理思路」。

要學會處理 rebalance 問題,我們需要先搞清楚 kafaka 消費者配置的四個參數:

  • session.timeout.ms 設定了逾時時間
  • heartbeat.interval.ms 心跳時間間隔
  • max.poll.interval.ms 每次消費的處理時間
  • max.poll.records 每次消費的消息數

session.timeout.ms 表示 consumer 向 broker 發送心跳的逾時時間。例如 session.timeout.ms = 180000 表示在最長 180 秒内 broker 沒收到 consumer 的心跳,那麼 broker 就認為該 consumer 死亡了,會啟動 rebalance。

heartbeat.interval.ms 表示 consumer 每次向 broker 發送心跳的時間間隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 發送一次心跳。一般來說,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。

max.poll.interval.ms 表示 consumer 每兩次 poll 消息的時間間隔。簡單地說,其實就是 consumer 每次消費消息的時長。如果消息處理的邏輯很重,那麼市場就要相應延長。否則如果時間到了 consumer 還麼消費完,broker 會預設認為 consumer 死了,發起 rebalance。

max.poll.records 表示每次消費的時候,擷取多少條消息。擷取的消息條數越多,需要處理的時間越長。是以每次拉取的消息數不能太多,需要保證在 max.poll.interval.ms 設定的時間内能消費完,否則會發生 rebalance。

簡單來說,會導緻崩潰的幾個點是:

  • 消費者心跳逾時,導緻 rebalance。
  • 消費者處理時間過長,導緻 rebalance。

消費者心跳逾時

我們知道消費者是通過心跳和協調者保持通訊的,如果協調者收不到心跳,那麼協調者會認為這個消費者死亡了,進而發起 rebalance。

而 kafka 的消費者參數設定中,跟心跳相關的兩個參數為:

  • session.timeout.ms 設定了逾時時間
  • heartbeat.interval.ms 心跳時間間隔

這時候需要調整 session.timeout.ms 和 heartbeat.interval.ms 參數,使得消費者與協調者能保持心跳。一般來說,逾時時間應該是心跳間隔的 3 倍時間。即 session.timeout.ms 如果設定為 180 秒,那麼 heartbeat.interval.ms 最多設定為 60 秒。

為什麼要這麼設定逾時時間應該是心跳間隔的 3 倍時間?因為這樣的話,在一個逾時周期内就可以有多次心跳,避免網絡問題導緻偶發失敗。

消費者處理時間過長

如果消費者處理時間過長,那麼同樣會導緻協調者認為該 consumer 死亡了,進而發起重平衡。

而 kafka 的消費者參數設定中,跟消費處理的兩個參數為:

  • max.poll.interval.ms 每次消費的處理時間
  • max.poll.records 每次消費的消息數
  • 對于心跳逾時問題。一般是調高心跳逾時時間(session.timeout.ms),調整逾時時間(session.timeout.ms)和心跳間隔時間(heartbeat.interval.ms)的比例。阿裡雲官方文檔建議逾時時間(session.timeout.ms)設定成 25s,最長不超過 30s。那麼心跳間隔時間(heartbeat.interval.ms)就不超過 10s。
  • 對于消費處理逾時問題。一般是增加消費者處理的時間(max.poll.interval.ms),減少每次處理的消息數(max.poll.records)。阿裡雲官方文檔建議 max.poll.records 參數要遠小于目前消費組的消費能力(records < 單個線程每秒消費的條數 x 消費線程的個數 x session.timeout的秒數)。

參考資料

  • ​​重平衡場景,寫得更好,更詳細!推薦!!Kafka | 消費者組重平衡全流程解析_大資料_sinat_27143551的部落格-CSDN部落格​​
  • ​​Kafka 重平衡機制 - 後端進階 - SegmentFault 思否​​
  • ​​為什麼消費用戶端頻繁出現Rebalance?_用戶端消費問題_常見問題_消息隊列Kafka版-阿裡雲​​