天天看點

[Kafka設計解析]--(四)Kafka Consumer設計解析

摘要

  本文主要介紹了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer實作的語義,以及适用場景。以及未來版本中對High Level Consumer的重新設計–使用Consumer Coordinator解決Split Brain和Herd等問題。

High Level Consumer

  很多時候,客戶程式隻是希望從Kafka讀取資料,不太關心消息offset的處理。同時也希望提供一些語義,例如同一條消息隻被某一個Consumer消費(單點傳播)或被所有Consumer消費(廣播)。是以,Kafka Hight Level Consumer提供了一個從Kafka消費資料的高層抽象,進而屏蔽掉其中的細節并提供豐富的語義。   

Consumer Group

  High Level Consumer将從某個Partition讀取的最後一條消息的offset存于Zookeeper中(​​Kafka從0.8.2版本​​​開始同時支援将offset存于Zookeeper中與​​将offset存于專用的Kafka Topic中​​​)。這個offset基于客戶程式提供給Kafka的名字來儲存,這個名字被稱為Consumer Group。Consumer Group是整個Kafka叢集全局的,而非某個Topic的。每一個High Level Consumer執行個體都屬于一個Consumer Group,若不指定則屬于預設的Group。

  Zookeeper中Consumer相關節點如下圖所示

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  很多傳統的Message Queue都會在消息被消費完後将消息删除,一方面避免重複消費,另一方面可以保證Queue的長度比較短,提高效率。而如上文所述,Kafka并不删除已消費的消息,為了實作傳統Message Queue消息隻被消費一次的語義,Kafka保證每條消息在同一個Consumer Group裡隻會被某一個Consumer消費。與傳統Message Queue不同的是,Kafka還允許不同Consumer Group同時消費同一條消息,這一特性可以為消息的多元化處理提供支援。

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時線上處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時将資料實時備份到另一個資料中心,隻需要保證這三個操作所使用的Consumer在不同的Consumer Group即可。下圖展示了Kafka在LinkedIn的一種簡化部署模型。

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  為了更清晰展示Kafka Consumer Group的特性,筆者進行了一項測試。建立一個Topic (名為topic1),再建立一個屬于group1的Consumer執行個體,并建立三個屬于group2的Consumer執行個體,然後通過Producer向topic1發送Key分别為1,2,3的消息。結果發現屬于group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分别收到了Key為1,2,3的消息,如下圖所示。

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  注:上圖中每個黑色區域代表一個Consumer執行個體,每個執行個體隻建立一個MessageStream。實際上,本實驗将Consumer應用程式打成jar包,并在4個不同的指令行終端中傳入不同的參數運作。

High Level Consumer Rebalance

  (本節所講述Rebalance相關内容均基于Kafka High Level Consumer)

  Kafka保證同一Consumer Group中隻有一個Consumer會消費某條消息,實際上,Kafka保證的是穩定狀态下每一個Consumer執行個體隻會消費某一個或多個特定Partition的資料,而某個Partition的資料隻會被某一個特定的Consumer執行個體所消費。也就是說Kafka對消息的配置設定是以Partition為機關配置設定的,而非以每一條消息作為配置設定單元。這樣設計的劣勢是無法保證同一個Consumer Group裡的Consumer均勻消費資料,優勢是每個Consumer不用都跟大量的Broker通信,減少通信開銷,同時也降低了配置設定難度,實作也更簡單。另外,因為同一個Partition裡的資料是有序的,這種設計可以保證每個Partition裡的資料可以被有序消費。

  如果某Consumer Group中Consumer(每個Consumer隻建立1個MessageStream)數量少于Partition數量,則至少有一個Consumer會消費多個Partition的資料,如果Consumer的數量與Partition數量相同,則正好一個Consumer消費一個Partition的資料。而如果Consumer的數量多于Partition的數量時,會有部分Consumer無法消費該Topic下任何一條消息。

  如下例所示,如果topic1有0,1,2共三個Partition,當group1隻有一個Consumer(名為consumer1)時,該 Consumer可消費這3個Partition的所有資料。

  ​​

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  增加一個Consumer(consumer2)後,其中一個Consumer(consumer1)可消費2個Partition的資料(Partition 0和Partition 1),另外一個Consumer(consumer2)可消費另外一個Partition(Partition 2)的資料。

  

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  再增加一個Consumer(consumer3)後,每個Consumer可消費一個Partition的資料。consumer1消費partition0,consumer2消費partition1,consumer3消費partition2。

  

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  再增加一個Consumer(consumer4)後,其中3個Consumer可分别消費一個Partition的資料,另外一個Consumer(consumer4)不能消費topic1的任何資料。

  

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  此時關閉consumer1,其餘3個Consumer可分别消費一個Partition的資料。

  

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  接着關閉consumer2,consumer3可消費2個Partition,consumer4可消費1個Partition。

  

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  

  再關閉consumer3,僅存的consumer4可同時消費topic1的3個Partition。

  

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  Consumer Rebalance的算法如下:

  • 将目标Topic下的所有Partirtion排序,存于PTPT
  • 對某Consumer Group下所有Consumer排序,存于CG于CG,第ii個Consumer記為CiCi
  • N=size(PT)/size(CG)N=size(PT)/size(CG),向上取整
  • 解除CiCi對原來配置設定的Partition的消費權(i從0開始)
  • 将第i∗Ni∗N到(i+1)∗N−1(i+1)∗N−1個Partition配置設定給CiCi

  

  目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制政策是由每一個Consumer通過在Zookeeper上注冊Watch完成的。每個Consumer被建立時會觸發Consumer Group的Rebalance,具體啟動流程如下:

  • High Level Consumer啟動時将其ID注冊到其Consumer Group下,在Zookeeper上的路徑為

​/consumers/[consumer group]/ids/[consumer id]​

​/consumers/[consumer group]/ids​

  • 上注冊Watch

​/brokers/ids​

  • 上注冊Watch
  • 如果Consumer通過Topic Filter建立消息流,則它會同時在

​/brokers/topics​

  • 上也建立Watch
  • 強制自己在其Consumer Group内啟動Rebalance流程

  在這種政策下,每一個Consumer或者Broker的增加或者減少都會觸發Consumer Rebalance。因為每個Consumer隻負責調整自己所消費的Partition,為了保證整個Consumer Group的一緻性,當一個Consumer觸發了Rebalance時,該Consumer Group内的其它所有其它Consumer也應該同時觸發Rebalance。

  該方式有如下缺陷:

  • Herd effect

      任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance

  • Split Brain

      每個Consumer分别單獨通過Zookeeper判斷哪些Broker和Consumer 當機了,那麼不同Consumer​​​在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的​​,這就會造成不正确的Reblance嘗試。

  • 調整結果不可控

      所有的Consumer都并不知道其它Consumer的Rebalance是否成功,這可能會導緻Kafka​​​工作在一個不正确的狀态​​。

  根據Kafka社群wiki,Kafka作者正在考慮在還未釋出的​​0.9.x版本中使用中心協調器(Coordinator)​​。大體思想是為所有Consumer Group的子集選舉出一個Broker作為Coordinator,由它Watch Zookeeper,進而判斷是否有Partition或者Consumer的增減,然後生成Rebalance指令,并檢查是否這些Rebalance在所有相關的Consumer中被執行成功,如果不成功則重試,若成功則認為此次Rebalance成功(這個過程跟Replication Controller非常類似)。具體方案将在後文中詳細闡述。   

Low Level Consumer

  使用Low Level Consumer (Simple Consumer)的主要原因是,使用者希望比Consumer Group更好的控制資料的消費。比如:

  • 同一條消息讀多次
  • 隻讀取某個Topic的部分Partition
  • 管理事務,進而確定每條消息被處理一次,且僅被處理一次

  與Consumer Group相比,Low Level Consumer要求使用者做大量的額外工作。

  • 必須在應用程式中跟蹤offset,進而确定下一條應該消費哪條消息
  • 應用程式需要通過程式獲知每個Partition的Leader是誰
  • 必須處理Leader的變化

  使用Low Level Consumer的一般流程如下

  • 查找到一個“活着”的Broker,并且找出每個Partition的Leader
  • 找出每個Partition的Follower
  • 定義好請求,該請求應該能描述應用程式需要哪些資料
  • Fetch資料
  • 識别Leader的變化,并對之作出必要的響應

Consumer重新設計

  根據社群社群wiki,Kafka在0.9.*版本中,重新設計Consumer可能是最重要的Feature之一。本節會根據社群wiki介紹Kafka 0.9.*中對Consumer可能的設計方向及思路。   

設計方向

簡化消費者用戶端

  部分使用者希望開發和使用non-java的用戶端。現階段使用non-java發SimpleConsumer比較友善,但想開發High Level Consumer并不容易。因為High Level Consumer需要實作一些複雜但必不可少的失敗探測和Rebalance。如果能将消費者用戶端更精簡,使依賴最小化,将會極大的友善non-java使用者實作自己的Consumer。

  

中心Coordinator

  如上文所述,目前版本的High Level Consumer存在Herd Effect和Split Brain的問題。如果将失敗探測和Rebalance的邏輯放到一個高可用的中心Coordinator,那麼這兩個問題即可解決。同時還可大大減少Zookeeper的負載,有利于Kafka Broker的Scale Out。

  

允許手工管理offset

  一些系統希望以特定的時間間隔在自定義的資料庫中管理Offset。這就要求Consumer能擷取到每條消息的metadata,例如Topic,Partition,Offset,同時還需要在Consumer啟動時得到每個Partition的Offset。實作這些,需要提供新的Consumer API。同時有個問題不得不考慮,即是否允許Consumer手工管理部分Topic的Offset,而讓Kafka自動通過Zookeeper管理其它Topic的Offset。一個可能的選項是讓每個Consumer隻能選取1種Offset管理機制,這可極大的簡化Consumer API的設計和實作。

  

Rebalance後觸發使用者指定的回調

  一些應用可能會在記憶體中為每個Partition維護一些狀态,Rebalance時,它們可能需要将該狀态持久化。是以該需求希望支援使用者實作并指定一些可插拔的并在Rebalance時觸發的回調。如果使用者使用手動的Offset管理,那該需求可友善得由使用者實作,而如果使用者希望使用Kafka提供的自動Offset管理,則需要Kafka提供該回調機制。

非阻塞式Consumer API

  該需求源于那些實作高層流處理操作,如filter by, group by, join等,的系統。現階段的阻塞式Consumer幾乎不可能實作Join操作。

##如何通過中心Coordinator實作Rebalance

  成功Rebalance的結果是,被訂閱的所有Topic的每一個Partition将會被Consumer Group内的一個(有且僅有一個)Consumer擁有。每一個Broker将被選舉為某些Consumer Group的Coordinator。某個Cosnumer Group的Coordinator負責在該Consumer Group的成員變化或者所訂閱的Topic的Partititon變化時協調Rebalance操作。

Consumer

  1) Consumer啟動時,先向Broker清單中的任意一個Broker發送ConsumerMetadataRequest,并通過ConsumerMetadataResponse擷取它所在Group的Coordinator資訊。ConsumerMetadataRequest和ConsumerMetadataResponse的結構如下

ConsumerMetadataRequest
{
  GroupId                => String
}
ConsumerMetadataResponse
{
  ErrorCode              => int16
  Coordinator            => Broker
}      

  2)Consumer連接配接到Coordinator并發送HeartbeatRequest,如果傳回的HeartbeatResponse沒有任何錯誤碼,Consumer繼續fetch資料。若其中包含IllegalGeneration錯誤碼,即說明Coordinator已經發起了Rebalance操作,此時Consumer停止fetch資料,commit offset,并發送JoinGroupRequest給它的Coordinator,并在JoinGroupResponse中獲得它應該擁有的所有Partition清單和它所屬的Group的新的Generation ID。此時Rebalance完成,Consumer開始fetch資料。相應Request和Response結構如下

HeartbeatRequest
{
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}
HeartbeatResponse
{
  ErrorCode              => int16
}
JoinGroupRequest
{
  GroupId                     => String
  SessionTimeout              => int32
  Topics                      => [String]
  ConsumerId                  => String
  PartitionAssignmentStrategy => String
}
JoinGroupResponse
{
  ErrorCode              => int16
  GroupGenerationId      => int32
  ConsumerId             => String
  PartitionsToOwn        => [TopicName [Partition]]
}
TopicName => String
Partition => int32      

Consumer狀态機

  ​​

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  Down:Consumer停止工作

  Start up & discover coordinator:Consumer檢測其所在Group的Coordinator。一旦它檢測到Coordinator,即向其發送JoinGroupRequest。

  Part of a group:該狀态下,Consumer已經是該Group的成員,并周期性發送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration錯誤碼,則轉換到Stopped Consumption狀态。若連接配接丢失,HeartbeatResponse包含NotCoordinatorForGroup錯誤碼,則轉換到Rediscover coordinator狀态。

  Rediscover coordinator:該狀态下,Consumer不停止消費而是嘗試通過發送ConsumerMetadataRequest來探測新的Coordinator,并且等待直到獲得無錯誤碼的響應。

  Stopped consumption:該狀态下,Consumer停止消費并送出offset,直到它再次加入Group。

  

  

  

故障檢測機制

  Consumer成功加入Group後,Consumer和相應的Coordinator同時開始故障探測程式。Consumer向Coordinator發起周期性的Heartbeat(HeartbeatRequest)并等待響應,該周期為 ​​​session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms内未收到HeartbeatResponse,或者發現相應的Socket​​​ channel斷開,它即認為Coordinator已當機并啟動Coordinator探測程式。若Coordinator在session.timeout.ms内沒有收到一次HeartbeatRequest,則它将該Consumer标記為當機狀态并為其所在Group觸發一次Rebalance操作。

  Coordinator Failover過程中,Consumer可能會在新的Coordinator完成Failover過程之前或之後發現新的Coordinator并向其發送HeatbeatRequest。對于後者,新的Cooodinator可能拒絕該請求,緻使該Consumer重新探測Coordinator并發起新的連接配接請求。如果該Consumer向新的Coordinator發送連接配接請求太晚,新的Coordinator可能已經在此之前将其标記為當機狀态而将之視為新加入的Consumer并觸發一次Rebalance操作。

Coordinator

  1)穩定狀态下,Coordinator通過上述故障探測機制跟蹤其所管理的每個Group下的每個Consumer的健康狀态。

  2)剛啟動時或選舉完成後,Coordinator從Zookeeper讀取它所管理的Group清單及這些Group的成員清單。如果沒有擷取到Group成員資訊,它不會做任何事情直到某個Group中有成員注冊進來。

  3)在Coordinator完成加載其管理的Group清單及其相應的成員資訊之前,它将為HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests傳回CoordinatorStartupNotComplete錯誤碼。此時,Consumer會重新發送請求。

  4)Coordinator會跟蹤被其所管理的任何Consumer Group注冊的Topic的Partition的變化,并為該變化觸發Rebalance操作。建立新的Topic也可能觸發Rebalance,因為Consumer可以在Topic被建立之前就已經訂閱它了。

  Coordinator發起Rebalance操作流程如下所示。

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

Coordinator狀态機

  ​​

​​

[Kafka設計解析]--(四)Kafka Consumer設計解析

​​

  Down:Coordinator不再擔任之前負責的Consumer Group的Coordinator

  Catch up:該狀态下,Coordinator競選成功,但還未能做好服務相應請求的準備。

  Ready:該狀态下,新競選出來的Coordinator已經完成從Zookeeper中加載它所負責管理的所有Group的metadata,并可開始接收相應的請求。

繼續閱讀