天天看點

kafka消費者組以及重平衡全流程解析

作者:一個即将退役的碼農

#頭條創作挑戰賽#

kafka消費者組

消費者組,即 Consumer Group,應該算是 Kafka 比較有亮點的設計了。那麼何謂 Consumer Group 呢?用一句話概括就是:Consumer Group 是 Kafka 提供的可擴充且具有容錯性的消費者機制。既然是一個組,那麼組内必然可以有多個消費者或消費者執行個體(Consumer Instance),它們共享一個公共的 ID,這個 ID 被稱為 Group ID。組内的所有消費者協調在一起來消費訂閱主題(Subscribed Topics)的所有分區(Partition)。當然,每個分區隻能由同一個消費者組内的一個 Consumer 執行個體來消費。個人認為,了解 Consumer Group 記住下面這三個特性就好了。

  1. Consumer Group 下可以有一個或多個 Consumer 執行個體。這裡的執行個體可以是一個單獨的程序,也可以是同一程序下的線程。在實際場景中,使用程序更為常見一些。
  2. Group ID 是一個字元串,在一個 Kafka 叢集中,它辨別唯一的一個 Consumer Group。
  3. Consumer Group 下所有執行個體訂閱的主題的單個分區,隻能配置設定給組内的某個 Consumer 執行個體消費。這個分區當然也可以被其他的 Group 消費。

大家都知道兩種消息引擎模型吧?它們分别是點對點模型和釋出 / 訂閱模型,前者也稱為消費隊列。當然,你要注意區分很多架構文章中涉及的消息隊列與這裡的消息隊列。國内很多文章都習慣把消息中間件這類架構統稱為消息隊列,我在這裡不評價這種提法是否準确,隻是想提醒你注意這裡所說的消息隊列,特指經典的消息引擎模型。

好了,傳統的消息引擎模型就是這兩大類,它們各有優劣。我們來簡單回顧一下。傳統的消息隊列模型的缺陷在于消息一旦被消費,就會從隊列中被删除,而且隻能被下遊的一個 Consumer 消費。嚴格來說,這一點不算是缺陷,隻能算是它的一個特性。但很顯然,這種模型的伸縮性(scalability)很差,因為下遊的多個 Consumer 都要“搶”這個共享消息隊列的消息。釋出 / 訂閱模型倒是允許消息被多個 Consumer 消費,但它的問題也是伸縮性不高,因為每個訂閱者都必須要訂閱主題的所有分區。這種全量訂閱的方式既不靈活,也會影響消息的真實投遞效果。

如果有這麼一種機制,既可以避開這兩種模型的缺陷,又兼具它們的優點,那就太好了。幸運的是,Kafka 的 Consumer Group 就是這樣的機制。當 Consumer Group 訂閱了多個主題後,組内的每個執行個體不要求一定要訂閱主題的所有分區,它隻會消費部分分區中的消息。

Consumer Group 之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不幹涉。再加上 Broker 端的消息留存機制,Kafka 的 Consumer Group 完美地規避了上面提到的伸縮性差的問題。可以這麼說,Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實作了傳統消息引擎系統的兩大模型:如果所有執行個體都屬于同一個 Group,那麼它實作的就是消息隊列模型;如果所有執行個體分别屬于不同的 Group,那麼它實作的就是釋出 / 訂閱模型。

在了解了 Consumer Group 以及它的設計亮點之後,你可能會有這樣的疑問:在實際使用場景中,我怎麼知道一個 Group 下該有多少個 Consumer 執行個體呢?理想情況下,Consumer 執行個體的數量應該等于該 Group 訂閱主題的分區總數。

舉個簡單的例子,假設一個 Consumer Group 訂閱了 3 個主題,分别是 A、B、C,它們的分區數依次是 1、2、3,那麼通常情況下,為該 Group 設定 6 個 Consumer 執行個體是比較理想的情形,因為它能最大限度地實作高伸縮性。

你可能會問,我能設定小于或大于 6 的執行個體嗎?當然可以!如果你有 3 個執行個體,那麼平均下來每個執行個體大約消費 2 個分區(6 / 3 = 2);如果你設定了 8 個執行個體,那麼很遺憾,有 2 個執行個體(8 – 6 = 2)将不會被配置設定任何分區,它們永遠處于空閑狀态。是以,在實際使用過程中一般不推薦設定大于總分區數的 Consumer 執行個體。設定多餘的執行個體隻會浪費資源,而沒有任何好處。

好了,說完了 Consumer Group 的設計特性,我們來讨論一個問題:針對 Consumer Group,Kafka 是怎麼管理位移的呢?你還記得吧,消費者在消費的過程中需要記錄自己消費了多少資料,即消費位置資訊。在 Kafka 中,這個位置資訊有個專門的術語:位移(Offset)。

看上去該 Offset 就是一個數值而已,其實對于 Consumer Group 而言,它是一組 KV 對,Key 是分區,V 對應 Consumer 消費該分區的最新位移。如果用 Java 來表示的話,你大緻可以認為是這樣的資料結構,即 Map<TopicPartition, Long>,其中 TopicPartition 表示一個分區,而 Long 表示位移的類型。當然,我必須承認 Kafka 源碼中并不是這樣簡單的資料結構,而是要比這個複雜得多,不過這并不會妨礙我們對 Group 位移的了解。

老版本的 Consumer 也有消費者組的概念,它和我們目前讨論的 Consumer Group 在使用感上并沒有太多的不同,隻是它管理位移的方式和新版本是不一樣的。

老版本的 Consumer Group 把位移儲存在 ZooKeeper 中。Apache ZooKeeper 是一個分布式的協調服務架構,Kafka 重度依賴它實作各種各樣的協調管理。将位移儲存在 ZooKeeper 外部系統的做法,最顯而易見的好處就是減少了 Kafka Broker 端的狀态儲存開銷。現在比較流行的提法是将伺服器節點做成無狀态的,這樣可以自由地擴縮容,實作超強的伸縮性。Kafka 最開始也是基于這樣的考慮,才将 Consumer Group 位移儲存在獨立于 Kafka 叢集之外的架構中。

不過,慢慢地人們發現了一個問題,即 ZooKeeper 這類元架構其實并不适合進行頻繁的寫更新,而 Consumer Group 的位移更新卻是一個非常頻繁的操作。這種大吞吐量的寫操作會極大地拖慢 ZooKeeper 叢集的性能,是以 Kafka 社群漸漸有了這樣的共識:将 Consumer 位移儲存在 ZooKeeper 中是不合适的做法。

于是,在新版本的 Consumer Group 中,Kafka 社群重新設計了 Consumer Group 的位移管理方式,采用了将位移儲存在 Kafka 内部主題的方法。這個内部主題就是讓人既愛又恨的 __consumer_offsets。我會在專欄後面的内容中專門介紹這個神秘的主題。不過,現在你需要記住新版本的 Consumer Group 将位移儲存在 Broker 端的内部主題中。

何為Rebalance

我們來說說 Consumer Group 端大名鼎鼎的重平衡,也就是所謂的 Rebalance 過程。我形容其為“大名鼎鼎”,從某種程度上來說其實也是“臭名昭著”,因為有關它的 bug 真可謂是此起彼伏,從未間斷。這裡我先賣個關子,後面我會解釋它“遭人恨”的地方。我們先來了解一下什麼是 Rebalance。

Rebalance 本質上是一種協定,規定了一個 Consumer Group 下的所有 Consumer 如何達成一緻,來配置設定訂閱 Topic 的每個分區。比如某個 Group 下有 20 個 Consumer 執行個體,它訂閱了一個具有 100 個分區的 Topic。正常情況下,Kafka 平均會為每個 Consumer 配置設定 5 個分區。這個配置設定的過程就叫 Rebalance。

那麼 Consumer Group 何時進行 Rebalance 呢?Rebalance 的觸發條件有 3 個。

  1. 組成員數發生變更。比如有新的 Consumer 執行個體加入組或者離開組,抑或是有 Consumer 執行個體崩潰被“踢出”組。
  2. 訂閱主題數發生變更。Consumer Group 可以使用正規表達式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的主題。在 Consumer Group 的運作過程中,你新建立了一個滿足這樣條件的主題,那麼該 Group 就會發生 Rebalance。
  3. 訂閱主題的分區數發生變更。Kafka 目前隻能允許增加一個主題的分區數。當分區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。

Rebalance 發生時,Group 下所有的 Consumer 執行個體都會協調在一起共同參與。你可能會問,每個 Consumer 執行個體怎麼知道應該消費訂閱主題的哪些分區呢?這就需要配置設定政策的協助了。

目前 Kafka 預設提供了 3 種配置設定政策,每種政策都有一定的優勢和劣勢,我們今天就不展開讨論了,你隻需要記住社群會不斷地完善這些政策,保證提供最公平的配置設定政策,即每個 Consumer 執行個體都能夠得到較為平均的分區數。比如一個 Group 内有 10 個 Consumer 執行個體,要消費 100 個分區,理想的配置設定政策自然是每個執行個體平均得到 10 個分區。這就叫公平的配置設定政策。如果出現了嚴重的配置設定傾斜,勢必會出現這種情況:有的執行個體會“閑死”,而有的執行個體則會“忙死”。

我們舉個簡單的例子來說明一下 Consumer Group 發生 Rebalance 的過程。假設目前某個 Consumer Group 下有兩個 Consumer,比如 A 和 B,當第三個成員 C 加入時,Kafka 會觸發 Rebalance,并根據預設的配置設定政策重新為 A、B 和 C 配置設定分區,如下圖所示:

kafka消費者組以及重平衡全流程解析

顯然,Rebalance 之後的配置設定依然是公平的,即每個 Consumer 執行個體都獲得了 3 個分區的消費權。這是我們希望出現的情形。

講完了 Rebalance,現在我來說說它“遭人恨”的地方。

首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。如果你了解 JVM 的垃圾回收機制,你一定聽過萬物靜止的收集方式,即著名的 stop the world,簡稱 STW。在 STW 期間,所有應用線程都會停止工作,表現為整個應用程式僵在那邊一動不動。Rebalance 過程也和這個類似,在 Rebalance 過程中,所有 Consumer 執行個體都會停止消費,等待 Rebalance 完成。這是 Rebalance 為人诟病的一個方面。

其次,目前 Rebalance 的設計是所有 Consumer 執行個體共同參與,全部重新配置設定所有分區。其實更高效的做法是盡量減少配置設定方案的變動。例如執行個體 A 之前負責消費分區 1、2、3,那麼 Rebalance 之後,如果可能的話,最好還是讓執行個體 A 繼續消費分區 1、2、3,而不是被重新配置設定其他的分區。這樣的話,執行個體 A 連接配接這些分區所在 Broker 的 TCP 連接配接就可以繼續用,不用重新建立連接配接其他 Broker 的 Socket 資源。

最後,Rebalance 實在是太慢了。曾經,有個國外使用者的 Group 内有幾百個 Consumer 執行個體,成功 Rebalance 一次要幾個小時!這完全是不能忍受的。最悲劇的是,目前社群對此無能為力,至少現在還沒有特别好的解決方案。所謂“本事大不如不攤上”,也許最好的解決方案就是避免 Rebalance 的發生吧。

消費者組的重平衡流程,它的作用是讓組内所有的消費者執行個體就消費哪些主題分區達成一緻。重平衡需要借助 Kafka Broker 端的 Coordinator 元件,在 Coordinator 的幫助下完成整個消費者組的分區重配置設定。今天我們就來詳細說說這個流程。

Rebalance觸發與通知

我們先來說說重平衡的 3 個觸發條件:

  1. 組成員數量發生變化。
  2. 訂閱主題數量發生變化。
  3. 訂閱主題的分區數發生變化。

就我個人的經驗來看,在實際生産環境中,因命中第 1 個條件而引發的重平衡是最常見的。另外,消費者組中的消費者執行個體依次啟動也屬于第 1 種情況,也就是說,每次消費者組啟動時,必然會觸發重平衡過程。

今天,我真正想引出的是另一個話題:重平衡過程是如何通知到其他消費者執行個體的?答案就是,靠消費者端的心跳線程(Heartbeat Thread)。

Kafka Java 消費者需要定期地發送心跳請求(Heartbeat Request)到 Broker 端的協調者,以表明它還存活着。在 Kafka 0.10.1.0 版本之前,發送心跳請求是在消費者主線程完成的,也就是你寫代碼調用 KafkaConsumer.poll 方法的那個線程。

這樣做有諸多弊病,最大的問題在于,消息處理邏輯也是在這個線程中完成的。是以,一旦消息處理消耗了過長的時間,心跳請求将無法及時發到協調者那裡,導緻協調者“錯誤地”認為該消費者已“死”。自 0.10.1.0 版本開始,社群引入了一個單獨的心跳線程來專門執行心跳請求發送,避免了這個問題。

但這和重平衡又有什麼關系呢?其實,重平衡的通知機制正是通過心跳線程來完成的。當協調者決定開啟新一輪重平衡後,它會将“REBALANCE_IN_PROGRESS”封裝進心跳請求的響應中,發還給消費者執行個體。當消費者執行個體發現心跳響應中包含了“REBALANCE_IN_PROGRESS”,就能立馬知道重平衡又開始了,這就是重平衡的通知機制。

對了,很多人還搞不清楚消費者端參數 heartbeat.interval.ms 的真實用途,我來解釋一下。從字面上看,它就是設定了心跳的間隔時間,但這個參數的真正作用是控制重平衡通知的頻率。如果你想要消費者執行個體更迅速地得到通知,那麼就可以給這個參數設定一個非常小的值,這樣消費者就能更快地感覺到重平衡已經開啟了。

消費者組狀态機

重平衡一旦開啟,Broker 端的協調者元件就要開始忙了,主要涉及到控制消費者組的狀态流轉。目前,Kafka 設計了一套消費者組狀态機(State Machine),來幫助協調者完成整個重平衡流程。嚴格來說,這套狀态機屬于非常底層的設計,Kafka 官網上壓根就沒有提到過,但你最好還是了解一下,因為它能夠幫助你搞懂消費者組的設計原理,比如消費者組的過期位移(Expired Offsets)删除等。

目前,Kafka 為消費者組定義了 5 種狀态,它們分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那麼,這 5 種狀态的含義是什麼呢?我們一起來看看下面這張表格。

kafka消費者組以及重平衡全流程解析

了解了這些狀态的含義之後,我們來看一張圖檔,它展示了狀态機的各個狀态流轉。

kafka消費者組以及重平衡全流程解析

我來解釋一下消費者組啟動時的狀态流轉過程。一個消費者組最開始是 Empty 狀态,當重平衡過程開啟後,它會被置于 PreparingRebalance 狀态等待成員加入,之後變更到 CompletingRebalance 狀态等待配置設定方案,最後流轉到 Stable 狀态完成重平衡。

當有新成員加入或已有成員退出時,消費者組的狀态從 Stable 直接跳到 PreparingRebalance 狀态,此時,所有現存成員就必須重新申請加入組。當所有成員都退出組後,消費者組狀态變更為 Empty。Kafka 定期自動删除過期位移的條件就是,組要處于 Empty 狀态。是以,如果你的消費者組停掉了很長時間(超過 7 天),那麼 Kafka 很可能就把該組的位移資料删除了。我相信,你在 Kafka 的日志中一定經常看到下面這個輸出:

Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.

這就是 Kafka 在嘗試定期删除過期位移。現在你知道了,隻有 Empty 狀态下的組,才會執行過期位移删除的操作。

消費者端重平衡流程

有了上面的内容作鋪墊,我們就可以開始介紹重平衡流程了。重平衡的完整流程需要消費者端和協調者元件共同參與才能完成。我們先從消費者的視角來審視一下重平衡的流程。

在消費者端,重平衡分為兩個步驟:分别是加入組和等待上司者消費者(Leader Consumer)配置設定方案。這兩個步驟分别對應兩類特定的請求:JoinGroup 請求和 SyncGroup 請求。

當組内成員加入組時,它會向協調者發送 JoinGroup 請求。在該請求中,每個成員都要将自己訂閱的主題上報,這樣協調者就能收集到所有成員的訂閱資訊。一旦收集了全部成員的 JoinGroup 請求後,協調者會從這些成員中選擇一個擔任這個消費者組的上司者。

通常情況下,第一個發送 JoinGroup 請求的成員自動成為上司者。你一定要注意區分這裡的上司者和之前我們介紹的上司者副本,它們不是一個概念。這裡的上司者是具體的消費者執行個體,它既不是副本,也不是協調者。上司者消費者的任務是收集所有成員的訂閱資訊,然後根據這些資訊,制定具體的分區消費配置設定方案。

選出上司者之後,協調者會把消費者組訂閱資訊封裝進 JoinGroup 請求的響應體中,然後發給上司者,由上司者統一做出配置設定方案後,進入到下一步:發送 SyncGroup 請求。

在這一步中,上司者向協調者發送 SyncGroup 請求,将剛剛做出的配置設定方案發給協調者。值得注意的是,其他成員也會向協調者發送 SyncGroup 請求,隻不過請求體中并沒有實際的内容。這一步的主要目的是讓協調者接收配置設定方案,然後統一以 SyncGroup 響應的方式分發給所有成員,這樣組内所有成員就都知道自己該消費哪些分區了。

接下來,我用一張圖來形象地說明一下 JoinGroup 請求的處理過程。

kafka消費者組以及重平衡全流程解析

就像前面說的,JoinGroup 請求的主要作用是将組成員訂閱資訊發送給上司者消費者,待上司者制定好配置設定方案後,重平衡流程進入到 SyncGroup 請求階段。

下面這張圖描述的是 SyncGroup 請求的處理流程。

kafka消費者組以及重平衡全流程解析

SyncGroup 請求的主要目的,就是讓協調者把上司者制定的配置設定方案下發給各個組内成員。當所有成員都成功接收到配置設定方案後,消費者組進入到 Stable 狀态,即開始正常的消費工作。

講完這裡,消費者端的重平衡流程我已經介紹完了。接下來,我們從協調者端來看一下重平衡是怎麼執行的。

Broker 端重平衡場景剖析

要剖析協調者端處理重平衡的全流程,我們必須要分幾個場景來讨論。這幾個場景分别是新成員加入組、組成員主動離組、組成員崩潰離組、組成員送出位移。接下來,我們一個一個來讨論。

場景一:新成員入組。

新成員入組是指組處于 Stable 狀态後,有新成員加入。如果是全新啟動一個消費者組,Kafka 是有一些自己的小優化的,流程上會有些許的不同。我們這裡讨論的是,組穩定了之後有新成員加入的情形。

當協調者收到新的 JoinGroup 請求後,它會通過心跳請求響應的方式通知組内現有的所有成員,強制它們開啟新一輪的重平衡。具體的過程和之前的用戶端重平衡流程是一樣的。現在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。

kafka消費者組以及重平衡全流程解析

場景二:組成員主動離組。

何謂主動離組?就是指消費者執行個體所線上程或程序調用 close() 方法主動通知協調者它要退出。這個場景就涉及到了第三類請求:LeaveGroup 請求。協調者收到 LeaveGroup 請求後,依然會以心跳響應的方式通知其他成員,是以我就不再贅述了,還是直接用一張圖來說明。

kafka消費者組以及重平衡全流程解析

場景三:組成員崩潰離組。

崩潰離組是指消費者執行個體出現嚴重故障,突然當機導緻的離組。它和主動離組是有差別的,因為後者是主動發起的離組,協調者能馬上感覺并處理。但崩潰離組是被動的,協調者通常需要等待一段時間才能感覺到,這段時間一般是由消費者端參數 session.timeout.ms 控制的。也就是說,Kafka 一般不會超過 session.timeout.ms 就能感覺到這個崩潰。當然,後面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。

kafka消費者組以及重平衡全流程解析

場景四:重平衡時協調者對組内成員送出位移的處理。

正常情況下,每個組内成員都會定期彙報位移給協調者。當重平衡開啟時,協調者會給予成員一段緩沖時間,要求每個成員必須在這段時間内快速地上報自己的位移資訊,然後再開啟正常的 JoinGroup/SyncGroup 請求發送。還是老辦法,我們使用一張圖來說明。

小結

好了,消費者組重平衡流程我已經全部講完了。雖然全程我都是拿兩個成員來舉例子,但你可以很容易地擴充到多個成員的消費者組,畢竟它們的原理是相同的。我希望你能多看幾遍今天的内容,徹底掌握 Kafka 的消費者重平衡流程。