在本文的第二部分中,探讨了可用的rebalance政策、使用靜态組成員身份減少不必要的rebalance次數的能力,以及rebalance要考慮的風險。
rebalance政策
一般有兩種政策:
- eager rebalance: 這種政策執行的時候所有消費者都要停止,然後重新進行join group過程
- incremental rebalance: 這種政策隻會把有影響的partition停止,不受影響的消費者仍然可以繼續消費,但是有可能會增加rebalance的次數
Eager rebalance
使用eager rebalance(預設),當消費者組rebalance時,消費者的所有處理都會停止,同時重新配置設定主題分區。這意味着rebalance的次數及其對應用程式的影響對于了解至關重要,因為它會對吞吐量産生重大影響。
下圖說明了當新消費者加入組時對包含單個消費者的現有消費者組的影響。然而,它适用于所有三種rebalance觸發場景,無論 Group Coordinator 是否已收到來自消費者的 JoinGroup 或 LeaveGroup 請求,或者它認為消費者可能已經失敗。
圖 1:Eager Rebalance
- 消費者 A 正在輪詢主題分區以擷取消息,并向 Group Coordinator 發出心跳通知它是健康的。Group Coordinator 對此心跳的響應是确認“Ok”,并且消費者繼續其處理。
- Consumer B 啟動并向 Group Coordinator 發送 JoinGroup 請求以觸發rebalance。(為了清楚起見,圖中僅顯示了消費者 B 的心跳線程)。
- Group Coordinator 響應它從現有消費者 Consumer A 收到的下一個心跳,通知它rebalance已經開始。
- 消費者 A 必須在其max.poll.interval.ms之前完成對來自其目前 poll() 的消息的處理,并以其自己的 JoinGroup 請求進行響應。
- 消費者 A 的輪詢完成,現在所有消息處理都已暫停。它對 poll() 的下一次調用會觸發發送一個 JoinGroup 請求,其中包含有關它對 Group Coordinator 感興趣的主題訂閱的資訊。
- 組協調器了解組中的所有消費者,知道所有現有主題分區已被釋放并可以重新配置設定的時間點。随着消費組穩定為兩個成員,已經達到“同步障礙”。
- Group Coordinator 将 JoinResponses 發送給兩個消費者。一個消費者被選為上司者并計算分區配置設定。消費者響應 SyncGroup 請求。組長的 SyncGroup 請求包含計算的分區配置設定。
- Group Coordinator 以 SyncResponse 響應每個消費者,通知他們他們的配置設定。消費者 A 現在恢複輪詢,消費者 B 開始輪詢。
在所有消費者都接受了他們的分區配置設定之前,消費者組的rebalance不會完成。該圖突出顯示了rebalance期間處理的暫停,并且随着消費者組的大小增加且成員增多,此暫停的持續時間變得更加重要。
Incremental rebalance
消費者群體越大,rebalance所需的時間就越長。如果急切的消費者組rebalance在它們發生時停止消息處理的影響被認為太大,那麼可以采用incremental rebalance政策。這一次,已被組協調器通知rebalance正在進行的現有消費者不會停止處理。相反,rebalance發生在兩個階段。當消費者從組協調器收到rebalance已開始的通知時,現在會發生以下情況:
- 現有消費者向組協調器發送 JoinGroup 請求,但繼續處理來自其配置設定的主題分區的消息。
- JoinGroup 請求包含他們的主題訂閱資訊和他們目前配置設定的資訊。
- 一旦 Group Coordinator 收到來自所有現有消費者的 JoinGroup Requests(或者他們已經逾時),它就會向消費者發送 JoinResponses,并配置設定一個新的 group leader。
- 新組長以 SyncGroup 請求響應,目标消費者對分區配置設定進行響應。
- Group Coordinator 通知消費者必須在 SyncResponse 中釋放分區。
- 隻有那些需要撤銷分區的消費者才真正停止處理,以便将這些分區重新配置設定給另一個消費者。
- 新的 JoinGroup 請求在第二輪rebalance協定中從所有消費者發送到組協調器,其中包含有關他們仍然擁有的分區和已撤銷分區的資訊。
- 在這一點上,該組已經穩定下來,rebalance已達到“同步障礙”。現在可以完成分區配置設定。
隻有那些需要rebalance的分區才會被撤銷。其他分區始終由其消費者擁有,不會中斷其消息的使用。
下圖說明了實際的incremental rebalance。
圖 2:Incremental Rebalance
- 當第二個消費者時,消費者 A 正在從兩個分區進行輪詢。
- 消費者 B,啟動并加入該組。這會觸發incremental rebalance。
- 消費者 A 放棄了對其中一個分區的配置設定。
- 分區被重新配置設定給消費者 B,消費者 B 開始使用來自它的消息。同時,消費者 A 不會停止從兩個分區中的另一個進行處理。
該圖不包括圍繞心跳的複雜性。
incremental rebalance 需要兩輪rebalance才能完成,是以會導緻更長的整體延遲。然而,rebalance對整體消息處理的影響不太嚴重。
incremental rebalance是通過将CooperativeStickyAssignor應用于消費者的partition.assignment.strategy設定來配置的。
靜态組成員
通過使用靜态組成員身份,可以減少不必要的rebalance次數,進而減少rebalance對吞吐量的影響。使用預設的rebalance協定,當消費者啟動時,它會被配置設定一個新的member.id(這是組協調器中的内部 Id),而不管它是否是剛剛重新啟動的現有消費者。任何消費者啟動都會觸發rebalance,并配置設定一個新的member.id。使用此協定,無法将消費者重新識别為相同的消費者。
靜态組成員身份協定引入了在消費者上配置唯一的group.instance.id的能力,将其标記為靜态成員。Group Coordinator 将此 group.instance.id 映射到内部member.id。如果消費者死亡并重新啟動,它将向 Group Coordinator 發送一個帶有此 id 的 JoinGroup 請求。在消費者關閉的場景中,直到其會話基于session.timeout.ms逾時,它才會從消費者組中删除。當消費者重新開機并重新加入組時,Group Coordinator檢查并找到group.instance.id比對它在消費者組中注冊的靜态成員。是以它知道它是同一個消費者執行個體并且不會觸發rebalance。配置設定給該消費者的分區被重新配置設定給它,現在恢複對來自這些分區的消息的處理。同時,配置設定給其他消費者的分區上的消息處理沒有中斷。
下圖示範了靜态組成員身份。兩個消費者屬于同一個消費者組,并配置設定了不同的group.instance.id值。他們正在輪詢來自同一主題的每個分區。消費者 B 停止并離開該組,但是不會立即觸發rebalance。消費者在session.timeout.ms逾時之前重新加入組并重新配置設定其分區,確定不需要rebalance。
圖 3:靜态組成員
為清楚起見,心跳線程僅針對消費者 B 顯示。
例如,可以通過将運作應用程式的 Kubernetes pod 的 Id 綁定到應用程式消費者的 group.instance.id 來利用此功能。如果 pod 當機并重新啟動,那麼 Group Coordinator 将識别消費者,因為group.instance.id将是相同的,并且可以避免潛在的代價高昂的rebalance。
靜态組成員身份在維護消費者中的狀态時特别令人感興趣,否則該狀态會在rebalance後丢失或必須重新加載。例如,有狀态重試允許消費者跨輪詢跟蹤消息批次的重試。如果被輪詢的分區配置設定給其他消費者,則重試計數将在消費者rebalance時丢失。此處的文章介紹了無狀态與有狀态重試。
使用靜态組成員身份時需要小心,因為當消費者死亡時,配置設定給它的那些分區将不會rebalance,直到消費者逾時。是以,配置更長的session.timeout.ms以允許重新啟動的消費者有時間重新加入并避免觸發rebalance會帶來風險,即真正失敗的消費者沒有重新加入将使分區沒有配置設定更長的消費者。但是,配置太短的session.timeout.ms可能無法讓消費者在從消費者組中删除之前有足夠的時間重新加入。當消費者重新加入時不再在消費者組中,就會觸發rebalance。
對于具有靜态組成員資格的消費者,它在離開組(或确實失敗)時不會發送 LeaveGroup 請求。相反,它會停止心跳并保留在組中,直到超過session.timeout.ms并被組協調器從組中删除。然後應将此逾時配置為足夠長,以便消費者有時間重新啟動并重新配置設定其分區,而無需rebalance。
rebalance風險
重複消息
逾時并被視為失敗的消費者可能仍在處理它已輪詢的消息,并且該處理可以成功完成。然而,它的消費者偏移寫入将被拒絕,因為消費者組rebalance會增加生成 ID,并且任何具有上一代 ID 的寫入都将被拒絕。同時,一個新的消費者執行個體被配置設定了rebalance中的主題分區,它消費和處理相同的消息。了解應用程式可能會收到重複的消息始終很重要,如果需要,它必須滿足這些要求。
rebalance風暴
在所有現有消費者重新加入或超過max.poll.interval.ms之前,rebalance不會完成。如果消費者在再次輪詢之前确實超過了max.poll.interval.ms,因為它處理最後一批消息的時間比預期的要長,那麼當它完成時,它将請求重新加入該組,進而觸發另一次rebalance。例如,如果rebalance的原因是由于影響所有消費者的下遊服務響應緩慢,則結果可以在觸發rebalance後rebalance,因為消費者不斷被驅逐然後重新加入,這是一場“rebalance風暴”。靜态組成員資格和incremental rebalance當然可以對此有所幫助,但無論采用何種政策,都必須謹慎對待rebalance配置。
結論
Consumer Group rebalance 是 Kafka 如何管理 consumer groups 的關鍵部分,這本身就是一個重要的特性,有助于使 Kafka 成為一個高度可擴充的分布式消息傳遞。了解rebalance的工作原理以及各種消費者配置如何影響它對于確定吞吐量最大化以及系統不會經常出現消息未被處理的情況至關重要。