天天看點

Kafka消費組rebalance原理

作者:架構即人生

消費者組是 Kafka 分布式消息處理的一個重要特征,用于管理消費者并促進擴充應用程式的能力。它們将任何一個主題的消費者組合在一起,并且主題内的分區被配置設定給這些消費者。當組的參與者發生變化時,消費者組rebalance可能由許多因素觸發,這會導緻在消費者之間重新配置設定分區。在rebalance期間,消息處理暫停,影響吞吐量。

在本文中,将介紹消費者組的角色、消費者組rebalance以及導緻rebalance的觸發器。詳細說明了影響rebalance持續時間和觸發rebalance時間的配置。在下一篇文章中,将介紹rebalance期間對應用程式消息處理的影響以及可以應用的rebalance政策。探讨了減少不必要的rebalance和減輕rebalance影響的選項。

消費群體

當應用程式實作了一個 Kafka 消費者來消費來自某個主題的消息時,該消費者屬于一個消費者組。在消費者組中,消費者被配置設定主題分區以進行消費。組成員在代理端(broker)進行管理,分區配置設定在用戶端進行管理。代理不知道資源是什麼以及它們是如何在消費者之間配置設定的。這是 Kafka 用戶端被視為"胖"用戶端的一個很好的例子。

消費者配置了group.id ,是以具有相同group.id的任何其他消費者執行個體都将屬于同一個消費者組。這有助于擴充消費者的能力,并且這與增加主題中的分區數量相結合提供了一種增加消息吞吐量的機制。

Group Coordinator 管理消費者組和消費者。這是一個位于代理端的 Kafka 元件。它将讓一個消費者成為上司者,這将負責計算主題分區配置設定。這些将傳回給 Group Coordinator,然後由 Group Coordinator 将分區配置設定給消費者。

給定一個應用程式執行個體,其中group.id為 'foo' 的消費者正在監聽特定主題,并且該主題有六個分區,然後消費者将輪詢所有六個分區中的消息。

Kafka消費組rebalance原理

圖 1:單個消費者組和一個消費者

現在啟動應用程式的第二個執行個體。是以,這将啟動具有相同group.id的“foo”的第二個消費者執行個體。第二個消費者執行個體向 Group Coordinator 發送 JoinGroup 請求,并且在消費者組中重新配置設定分區以分散負載。消費者組中有兩個成員,每個消費者執行個體配置設定三個分區。

Kafka消費組rebalance原理

圖 2:具有兩個消費者的單個消費者組“foo”

啟動第三個應用程式,組協調器再次重新配置設定分區,每個消費者現在輪詢來自兩個分區的消息。

如果消費者執行個體多于分區,那麼這些額外的消費者将不會配置設定任何分區。一個主題分區将永遠隻有一個消費者從給定的消費者組中收聽它。是以一個由 5 個消費者組成的消費者組,監聽一個具有 3 個分區的主題,将有 2 個空閑消費者。

如果一個消費者以不同的group.id配置啟動(就像不同服務的情況一樣),并且它正在偵聽相同的主題,那麼這将是一個單獨的消費者組的一部分。它的分區配置設定獨立于任何其他消費者組的配置設定。

Kafka消費組rebalance原理

圖 3:兩個消費者組 'foo' 和 'bar'

Rebalance觸發器

發生消費者組rebalance的原因有多種。

  • 一個新的消費者加入一個消費者組
  • 一個現有的消費者離開一個消費者組
  • 代理認為一個消費者可能已經失敗了
  • Consumer Grouop訂閱的任意Topic出現分區數量的變化
  • 消費者調用unsubscrible()取消對某Topic的訂閱

除此之外,任何其他重新配置設定資源的需求都将觸發重新平衡。一個示例是建立一個主題,其中為消費者配置了與該主題名稱比對的模式訂閱。

當一個新的消費者加入一個消費者組時,它會向代理上的組協調器發送一個 JoinGroup 請求。然後在組中的所有一個或多個消費者之間重新配置設定主題分區。同樣,當消費者離開組時,它會通過 LeaveGroup 請求通知組協調器,該請求再次在剩餘的消費者之間重新配置設定主題分區(如果有的話)。

當 Group Coordinator 在預期的時間範圍内沒有收到消費者的消息時,無論是心跳還是下一次 poll() 調用,它都會将消費者從組中驅逐,認為它可能已經失敗。主題分區再次被重新配置設定給組中剩餘的任何其他消費者。

如果一個服務有多個訂閱互斥主題但共享同一個group.id的消費者,那麼任何一個消費者觸發的rebalance仍然會影響組中的其他消費者。在以下場景中,消費者 A訂閱了主題abc,而消費者 B訂閱了主題def。他們在同一個消費者組foo中。如果消費者 A處理一個批次的時間過長并且逾時,那麼它将從消費者組中删除,進而觸發rebalance。組中的所有分區配置設定都被撤銷和重新配置設定,包括Consumer B的配置設定。

Kafka消費組rebalance原理

圖 4:跨越主題的消費者組

當消費者 A最終完成其輪詢并重新加入消費者組時,将觸發進一步的rebalance,并且随着分區被撤銷和重新配置設定,所有處理再次停止。是以,為收聽不同主題的消費者定義單獨的消費者組可能是謹慎的。例如[service]-[topic]-consumer-group。

Rebalance配置

概述

對于 Apache Java Kafka 用戶端,以下是消費者的關鍵配置,這些配置會影響rebalance需要多長時間才能完成,以及何時消費者可能被代理視為失敗,進而觸發rebalance。

Kafka消費組rebalance原理

以下部分檢查這些配置參數的影響。

心跳和會話逾時

消費者定期向 Group Coordinator(位于 broker 上)發送心跳。這允許 Group Coordinator 監控組中消費者的健康狀況。必須在session.timeout.ms内收到心跳,并根據 heartbeat.interval.ms 發送心跳。當 Group Coordinator 收到心跳時session.timeout.ms會重置,它會響應消費者,并且必須在此重置逾時内接收下一個消費者心跳。

Kafka消費組rebalance原理

圖 5:消費者的心跳

建議将heartbeat.interval.ms配置為不超過session.timeout.ms的三分之一。這確定了如果由于例如瞬态網絡問題而丢失一兩個心跳,則不會認為消費者失敗。在此圖中,有兩個心跳丢失,但第三個在會話逾時之前到達,是以 Group Coordinator 知道消費者仍然健康。

Kafka消費組rebalance原理

圖 6:失敗的心跳

如果消費者确實失敗并停止心跳,那麼一旦會話逾時到期,它就會從消費者組中被逐出,進而導緻消費者組rebalance。

Kafka消費組rebalance原理

圖 7:消費者失敗

輪詢間隔

心跳在與主處理線程不同的線程上執行。消費者在主處理線程上輪詢其主題分區,每次調用 poll() 都必須在配置的max.poll.interval.ms内發生。下圖添加了消費者處理線程,顯示了該線程的職責以及心跳線程的職責。

Kafka消費組rebalance原理

圖 8:消費者心跳和輪詢

對 poll() 的第一次調用,以及對 poll() 的任何調用,包括分區配置設定等更改,都會導緻啟動心跳線程。每個後續的 poll() 調用都會重新開始輪詢時間,這樣它就有這個完整的 max.poll.interval.ms可以在其中完成。

心跳線程檢查消費者處理的狀态,如果在輪詢之間超過了max.poll.interval.ms,那麼它會發送一個 LeaveGroup 請求而不是心跳。Group Coordinator 将消費者從消費者組中移除,進而觸發rebalance。

Kafka消費組rebalance原理

圖 9:消費者超過輪詢間隔

當觸發rebalance時,現有消費者将收到對其下一個“rebalance”心跳的響應。每個消費者在max.poll.interval.ms逾時之前通過調用 poll() 重新加入組,因為這會觸發對組協調器的 JoinGroup 請求。請注意,對于 Kafka Connect,為此提供了單獨的逾時,即rebalance.timeout.ms。

是以,配置max.poll.interval.ms需要仔細考慮。将其設定得太低,風險在于單個輪詢中消耗的一批消息未及時處理,導緻rebalance和重複消息傳遞。将間隔設定得太高,這意味着當消費者确實失敗時,代理需要更長的時間才能意識到并重新配置設定消費者的分區。在此處理期間,配置設定給失敗消費者的主題分區上的消息被卡住。

消費者健康

是以,有兩個逾時需要考慮,這與消費者何時被認為是健康的或失敗并被逐出消費者組有關。如果主處理線程死亡,而心跳線程仍在運作,則通過超出max.poll.interval.ms來檢測故障。如果整個應用程式死了,那麼這将通過session.timeout.ms内沒有收到心跳來檢測。

max.poll.interval.ms本質上是消費者處理的主要健康檢查。但是,通過在單獨的線程上使用心跳檢查,這意味着可以更快地檢測到整個應用程式發生故障。