本文轉發自技術世界,
摘要
本文在上篇文章基礎上,更加深入講解了Kafka的HA機制,主要闡述了HA相關各種場景,如Broker failover,Controller failover,Topic建立/删除,Broker啟動,Follower從Leader fetch資料等詳細處理過程。同時介紹了Kafka提供的與Replication相關的工具,如重新配置設定Partition等。
Broker Failover過程
Controller對Broker failure的處理過程
- Controller在Zookeeper的
/brokers/ids
- 節點上注冊Watch。一旦有Broker當機(本文用當機代表任何讓Kafka認為其Broker die的情景,包括但不限于機器斷電,網絡不可用,GC導緻的Stop The World,程序crash等),其在Zookeeper對應的Znode會自動被删除,Zookeeper會fire Controller注冊的Watch,Controller即可擷取最新的幸存的Broker清單。
- Controller決定set_p,該集合包含了當機的所有Broker上的所有Partition。
-
對set_p中的每一個Partition:
3.1 從
/brokers/topics/[topic]/partitions/[partition]/state
-
讀取該Partition目前的ISR。
3.2 決定該Partition的新Leader。如果目前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含目前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的資料丢失)。如果該Partition的所有Replica都當機了,則将新的Leader設定為-1。
3.3 将新的Leader,ISR和新的
leader_epoch
- 及
controller_epoch
- 寫入
/brokers/topics/[topic]/partitions/[partition]/state
- 。注意,該操作隻有Controller版本在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1。
-
直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest指令。Controller可以在一個RPC操作中發送多個指令進而提高效率。
Broker failover順序圖如下所示。
LeaderAndIsrRequest結構如下
LeaderAndIsrResponse結構如下
建立/删除Topic
- Controller在Zookeeper的
/brokers/topics
- 節點上注冊Watch,一旦某個Topic被建立或删除,則Controller會通過Watch得到新建立/删除的Topic的Partition/Replica配置設定。
- 對于删除Topic操作,Topic工具會将該Topic名字存于
/admin/delete_topics
- 。若
delete.topic.enable
- 為true,則Controller注冊在
/admin/delete_topics
- 上的Watch被fire,Controller通過回調向對應的Broker發送StopReplicaRequest;若為false則Controller不會在
/admin/delete_topics
- 上注冊Watch,也就不會對該事件作出反應,此時Topic操作隻被記錄而不會被執行。
- 對于建立Topic操作,Controller從
/brokers/ids
-
讀取目前所有可用的Broker清單,對于set_p中的每一個Partition:
3.1 從配置設定給該Partition的所有Replica(稱為AR)中任選一個可用的Broker作為新的Leader,并将AR設定為新的ISR(因為該Topic是新建立的,是以AR中所有的Replica都沒有資料,可認為它們都是同步的,也即都在ISR中,任意一個Replica都可作為Leader)
3.2 将新的Leader和ISR寫入
/brokers/topics/[topic]/partitions/[partition]
-
直接通過RPC向相關的Broker發送LeaderAndISRRequest。
建立Topic順序圖如下所示。
Broker響應請求流程
kafka.network.SocketServer
及相關子產品接受各種請求并作出響應。整個網絡通信子產品基于Java NIO開發,并采用Reactor模式,其中包含1個Acceptor負責接受客戶請求,N個Processor負責讀寫資料,M個Handler處理業務邏輯。
Acceptor的主要職責是監聽并接受用戶端(請求發起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的連接配接請求,并建立和用戶端的資料傳輸通道,然後為該用戶端指定一個Processor,至此它對該用戶端該次請求的任務就結束了,它可以去響應下一個用戶端的連接配接請求了。其核心代碼如下。
Processor主要負責從用戶端讀取資料并将響應傳回給用戶端,它本身并不處理具體的業務邏輯,并且其内部維護了一個隊列來儲存配置設定給它的所有SocketChannel。Processor的run方法會循環從隊列中取出新的SocketChannel并将其
SelectionKey.OP_READ
注冊到selector上,然後循環處理已就緒的讀(請求)和寫(響應)。Processor讀取完資料後,将其封裝成Request對象并将其交給RequestChannel。
RequestChannel是Processor和KafkaRequestHandler交換資料的地方,它包含一個隊列requestQueue用來存放Processor加入的Request,KafkaRequestHandler會從裡面取出Request來處理;同時它還包含一個respondQueue,用來存放KafkaRequestHandler處理完Request後返還給用戶端的Response。
Processor會通過processNewResponses方法依次将requestChannel中responseQueue儲存的Response取出,并将對應的
SelectionKey.OP_WRITE
事件注冊到selector上。當selector的select方法傳回時,對檢測到的可寫通道,調用write方法将Response傳回給用戶端。
KafkaRequestHandler循環從RequestChannel中取Request并交給
kafka.server.KafkaApis
處理具體的業務邏輯。
LeaderAndIsrRequest響應過程
對于收到的LeaderAndIsrRequest,Broker主要通過ReplicaManager的becomeLeaderOrFollower處理,流程如下:
- 若請求中controllerEpoch小于目前最新的controllerEpoch,則直接傳回ErrorMapping.StaleControllerEpochCode。
-
對于請求中partitionStateInfos中的每一個元素,即((topic, partitionId), partitionStateInfo):
2.1 若partitionStateInfo中的leader epoch大于目前ReplicManager中存儲的(topic, partitionId)對應的partition的leader epoch,則:
2.1.1 若目前brokerid(或者說replica id)在partitionStateInfo中,則将該partition及partitionStateInfo存入一個名為partitionState的HashMap中
2.1.2否則說明該Broker不在該Partition配置設定的Replica list中,将該資訊記錄于log中
2.2否則将相應的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中
- 篩選出partitionState中Leader與目前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
- 若partitionsTobeLeader不為空,則對其執行makeLeaders方。
- 若partitionsToBeFollower不為空,則對其執行makeFollowers方法。
- 若highwatermak線程還未啟動,則将其啟動,并将hwThreadInitialized設為true。
- 關閉所有Idle狀态的Fetcher。
LeaderAndIsrRequest處理過程如下圖所示
Broker啟動過程
/brokers/ids
zonde下建立臨時子節點(Ephemeral node),建立成功後Controller的ReplicaStateMachine注冊其上的Broker Change Watch會被fire,進而通過回調KafkaController.onBrokerStartup方法完成以下步驟:
-
向所有新啟動的Broker發送UpdateMetadataRequest,其定義如下。
- 将新啟動的Broker上的所有Replica設定為OnlineReplica狀态,同時這些Broker會為這些Partition啟動high watermark線程。
- 通過partitionStateMachine觸發OnlinePartitionStateChange。
Controller Failover
Controller也需要Failover。每個Broker都會在Controller Path (
/controller
)上注冊一個Watch。目前Controller失敗時,對應的Controller Path會自動消失(因為它是Ephemeral Node),此時該Watch被fire,所有“活”着的Broker都會去競選成為新的Controller(建立新的Controller Path),但是隻會有一個競選成功(這點由Zookeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上注冊Watch。因為Zookeeper的Watch是一次性的,被fire一次之後即失效,是以需要重新注冊。
Broker成功競選為新Controller後會觸發KafkaController.onControllerFailover方法,并在該方法中完成如下操作:
- 讀取并增加Controller Epoch。
- 在ReassignedPartitions Path(
/admin/reassign_partitions
- )上注冊Watch。
- 在PreferredReplicaElection Path(
/admin/preferred_replica_election
- )上注冊Watch。
- 通過partitionStateMachine在Broker Topics Patch(
/brokers/topics
- )上注冊Watch。
- 若
delete.topic.enable
- 設定為true(預設值是false),則partitionStateMachine在Delete Topic Patch(
/admin/delete_topics
- )上注冊Watch。
- 通過replicaStateMachine在Broker Ids Patch(
/brokers/ids
- )上注冊Watch。
- 初始化ControllerContext對象,設定目前所有Topic,“活”着的Broker清單,所有Partition的Leader及ISR等。
- 啟動replicaStateMachine和partitionStateMachine。
- 将brokerState狀态設定為RunningAsController。
- 将每個Partition的Leadership資訊發送給所有“活”着的Broker。
- 若
auto.leader.rebalance.enable
- 配置為true(預設值是true),則啟動partition-rebalance線程。
- 若
delete.topic.enable
- 設定為true且Delete Topic Patch(
/admin/delete_topics
- )中有值,則删除相應的Topic。
Partition重新配置設定
/admin/reassign_partitions
上,而該操作會觸發ReassignedPartitionsIsrChangeListener,進而通過執行回調函數KafkaController.onPartitionReassignment來完成以下操作:
- 将Zookeeper中的AR(Current Assigned Replicas)更新為OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
- 強制更新Zookeeper中的leader epoch,向AR中的每個Replica發送LeaderAndIsrRequest。
- 将RAR - OAR中的Replica設定為NewReplica狀态。
- 等待直到RAR中所有的Replica都與其Leader同步。
- 将RAR中所有的Replica都設定為OnlineReplica狀态。
- 将Cache中的AR設定為RAR。
- 若Leader不在RAR中,則從RAR中重新選舉出一個新的Leader并發送LeaderAndIsrRequest。若新的Leader不是從RAR中選舉而出,則還要增加Zookeeper中的leader epoch。
- 将OAR - RAR中的所有Replica設定為OfflineReplica狀态,該過程包含兩部分。第一,将Zookeeper上ISR中的OAR - RAR移除并向Leader發送LeaderAndIsrRequest進而通知這些Replica已經從ISR中移除;第二,向OAR - RAR中的Replica發送StopReplicaRequest進而停止不再配置設定給該Partition的Replica。
- 将OAR - RAR中的所有Replica設定為NonExistentReplica狀态進而将其從磁盤上删除。
- 将Zookeeper中的AR設定為RAR。
- 删除
/admin/reassign_partition
-
。
注意:最後一步才将Zookeeper中的AR更新,因為這是唯一一個持久存儲AR的地方,如果Controller在這一步之前crash,新的Controller仍然能夠繼續完成該過程。
以下是Partition重新配置設定的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新配置設定過程中Zookeeper中的AR和Leader/ISR路徑如下
Follower通過向Leader發送FetchRequest擷取消息,FetchRequest結構如下Follower從Leader Fetch資料
從FetchRequest的結構可以看出,每個Fetch請求都要指定最大等待時間和最小擷取位元組數,以及由TopicAndPartition和PartitionFetchInfo構成的Map。實際上,Follower從Leader資料和Consumer從Broker Fetch資料,都是通過FetchRequest請求完成,是以在FetchRequest結構中,其中一個字段是clientID,并且其預設值是ConsumerConfig.DefaultClientId。
Leader收到Fetch請求後,Kafka通過KafkaApis.handleFetchRequest響應該請求,響應過程如下:
- replicaManager根據請求讀出資料存入dataRead中。
- 如果該請求來自Follower則更新其相應的LEO(log end offset)以及相應Partition的High Watermark
- 根據dataRead算出可讀消息長度(機關為位元組)并存入bytesReadable中。
- 滿足下面4個條件中的1個,則立即将相應的資料傳回
- Fetch請求不希望等待,即fetchRequest.macWait <= 0
- Fetch請求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo為空
- 有足夠的資料可供傳回,即bytesReadable >= fetchRequest.minBytes
- 讀取資料時發生異常
- 若不滿足以上4個條件,FetchRequest将不會立即傳回,并将該請求封裝成DelayedFetch。檢查該DeplayedFetch是否滿足,若滿足則傳回請求,否則将該請求加入Watch清單
Leader通過以FetchResponse的形式将消息傳回給Follower,FetchResponse結構如下
#Replication工具
Topic Tool
$KAFKA_HOME/bin/kafka-topics.sh
,該工具可用于建立、删除、修改、檢視某個Topic,也可用于列出所有Topic。另外,該工具還可修改以下配置。
unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes
Replica Verification Tool
$KAFKA_HOME/bin/kafka-replica-verification.sh
,該工具用來驗證所指定的一個或多個Topic下每個Partition對應的所有Replica是否都同步。可通過
topic-white-list
這一參數指定所需要驗證的所有Topic,支援正規表達式。
Preferred Replica Leader Election Tool
用途
有了Replication機制後,每個Partition可能有多個備份。某個Partition的Replica清單叫作AR(Assigned Replicas),AR中的第一個Replica即為“Preferred Replica”。建立一個新的Topic或者給已有Topic增加Partition時,Kafka保證Preferred Replica被均勻分布到叢集中的所有Broker上。理想情況下,Preferred Replica會被選為Leader。以上兩點保證了所有Partition的Leader被均勻分布到了叢集當中,這一點非常重要,因為所有的讀寫操作都由Leader完成,若Leader分布過于集中,會造成叢集負載不均衡。但是,随着叢集的運作,該平衡可能會因為Broker的當機而被打破,該工具就是用來幫助恢複Leader配置設定的平衡。
事實上,每個Topic從失敗中恢複過來後,它預設會被設定為Follower角色,除非某個Partition的Replica全部當機,而目前Broker是該Partition的AR中第一個恢複回來的Replica。是以,某個Partition的Leader(Preferred Replica)當機并恢複後,它很可能不再是該Partition的Leader,但仍然是Preferred Replica。
原理
- 在Zookeeper上建立
/admin/preferred_replica_election
- 節點,并存入需要調整Preferred Replica的Partition資訊。
- Controller一直Watch該節點,一旦該節點被建立,Controller會收到通知,并擷取該内容。
- Controller讀取Preferred Replica,如果發現該Replica目前并非是Leader并且它在該Partition的ISR中,Controller向該Replica發送LeaderAndIsrRequest,使該Replica成為Leader。如果該Replica目前并非是Leader,且不在ISR中,Controller為了保證沒有資料丢失,并不會将其設定為Leader。
用法
$KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181
指令檢視其Partition/Replica分布。
查詢結果如下圖所示,從圖中可以看到,Kafka将所有Replica均勻分布到了整個叢集,并且Leader也均勻分布。
手動停止部分Broker,topic1的Partition/Replica分布如下圖所示。從圖中可以看到,由于Broker 1/2/4都被停止,Partition 0的Leader由原來的1變為3,Partition 1的Leader由原來的2變為5,Partition 2的Leader由原來的3變為6,Partition 3的Leader由原來的4變為7。
再重新啟動ID為1的Broker,topic1的Partition/Replica分布如下。可以看到,雖然Broker 1已經啟動(Partition 0和Partition5的ISR中有1),但是1并不是任何一個Parititon的Leader,而Broker 5/6/7都是2個Partition的Leader,即Leader的分布不均衡——一個Broker最多是2個Partition的Leader,而最少是0個Partition的Leader。
運作該工具後,topic1的Partition/Replica分布如下圖所示。由圖可見,除了Partition 1和Partition 3由于Broker 2和Broker 4還未啟動,是以其Leader不是其Preferred Repliac外,其它所有Partition的Leader都是其Preferred Replica。同時,與運作該工具前相比,Leader的配置設定更均勻——一個Broker最多是2個Parittion的Leader,最少是1個Partition的Leader。
啟動Broker 2和Broker 4,Leader分布與上一步相比并未變化,如下圖所示。
再次運作該工具,所有Partition的Leader都由其Preferred Replica承擔,Leader分布更均勻——每個Broker承擔1個Partition的Leader角色。
除了手動運作該工具使Leader配置設定均勻外,Kafka還提供了自動平衡Leader配置設定的功能,該功能可通過将
auto.leader.rebalance.enable
設定為true開啟,它将周期性檢查Leader配置設定是否平衡,若不平衡度超過一定門檻值則自動由Controller嘗試将各Partition的Leader設定為其Preferred Replica。檢查周期由
leader.imbalance.check.interval.seconds
指定,不平衡度門檻值由
leader.imbalance.per.broker.percentage
指定。
Kafka Reassign Partitions Tool
用途
該工具的設計目标與Preferred Replica Leader Election Tool有些類似,都旨在促進Kafka叢集的負載均衡。不同的是,Preferred Replica Leader Election隻能在Partition的AR範圍内調整其Leader,使Leader分布均勻,而該工具還可以調整Partition的AR。
Follower需要從Leader Fetch資料以保持與Leader同步,是以僅僅保持Leader分布的平衡對整個叢集的負載均衡來說是不夠的。另外,生産環境下,随着負載的增大,可能需要給Kafka叢集擴容。向Kafka叢集中增加Broker非常簡單友善,但是對于已有的Topic,并不會自動将其Partition遷移到新加入的Broker上,此時可用該工具達到此目的。某些場景下,實際負載可能遠小于最初預期負載,此時可用該工具将分布在整個叢集上的Partition重裝配置設定到某些機器上,然後可以停止不需要的Broker進而實作節約資源的目的。
需要說明的是,該工具不僅可以調整Partition的AR位置,還可調整其AR數量,即改變該Topic的replication factor。
原理
該工具隻負責将所需資訊存入Zookeeper中相應節點,然後退出,不負責相關的具體操作,所有調整都由Controller完成。
- 在Zookeeper上建立
/admin/reassign_partitions
- 節點,并存入目标Partition清單及其對應的目标AR清單。
- Controller注冊在
/admin/reassign_partitions
- 上的Watch被fire,Controller擷取該清單。
- 對清單中的所有Partition,Controller會做如下操作:
- 啟動
RAR - AR
- 中的Replica,即新配置設定的Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
- 等待新的Replica與Leader同步
- 如果Leader不在RAR中,從RAR中選出新的Leader
- 停止并删除
AR - RAR
- 中的Replica,即不再需要的Replica
- 删除
/admin/reassign_partitions
- 節點
用法
該工具有三種使用模式
- generate模式,給定需要重新配置設定的Topic,自動生成reassign plan(并不執行)
- execute模式,根據指定的reassign plan重新配置設定Partition
- verify模式,驗證重新配置設定Partition是否成功
下面這個例子将使用該工具将Topic的所有Partition重新配置設定到Broker 4/5/6/7上,步驟如下:
- 使用generate模式,生成reassign plan。指定需要重新配置設定的Topic ({“topics”:[{“topic”:”topic1”}],”version”:1}),并存入
/tmp/topics-to-move.json
- 檔案中,然後執行
$KAFKA_HOME/bin/kafka-reassign-partitions.sh
--zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json
--broker-list "4,5,6,7" --generate
結果如下圖所示
2. 使用execute模式,執行reassign plan
/tmp/reassign-plan.json
檔案中,并執行
$KAFKA_HOME/bin/kafka-reassign-partitions.sh
--zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --execute
/admin/reassign_partitions
節點被建立,且其值與
/tmp/reassign-plan.json
檔案的内容一緻。
3. 使用verify模式,驗證reassign是否完成。執行verify指令
$KAFKA_HOME/bin/kafka-reassign-partitions.sh
--zookeeper localhost:2181 --verify
--reassignment-json-file /tmp/reassign-plan.json
結果如下所示,從圖中可看出topic1的所有Partititon都重新配置設定成功。
接下來用Topic Tool再次驗證。
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1
結果如下圖所示,從圖中可看出topic1的所有Partition都被重新配置設定到Broker 4/5/6/7,且每個Partition的AR與reassign plan一緻。
需要說明的是,在使用execute之前,并不一定要使用generate模式自動生成reassign plan,使用generate模式隻是為了友善。事實上,某些場景下,generate模式生成的reassign plan并不一定能滿足需求,此時使用者可以自己設定reassign plan。
State Change Log Merge Tool
用途
該工具旨在從整個叢集的Broker上收集狀态改變日志,并生成一個集中的格式化的日志以幫助診斷狀态改變相關的故障。每個Broker都會将其收到的狀态改變相關的的指令存于名為
state-change.log
的日志檔案中。某些情況下,Partition的Leader Election可能會出現問題,此時我們需要對整個叢集的狀态改變有個全局的了解進而診斷故障并解決問題。該工具将叢集中相關的
state-change.log
日志按時間順序合并,同時支援使用者輸入時間範圍和目标Topic及Partition作為過濾條件,最終将格式化的結果輸出。
用法
bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log
--topic topic1 --partitions 0,1,2,3,4,5,6,7