本文轉發自技術世界,
Kafka在0.8以前的版本中,并不提供High Availablity機制,一旦一個或多個Broker當機,則當機期間其上所有Partition都無法繼續提供服務。若該Broker永遠不能再恢複,亦或磁盤故障,則其上資料将丢失。而Kafka的設計目标之一即是提供資料持久化,同時對于分布式系統來說,尤其當叢集規模上升到一定程度後,一台或者多台機器當機的可能性大大提高,對于Failover機制的需求非常高。是以,Kafka從0.8開始提供High Availability機制。本文從Data Replication和Leader Election兩方面介紹了Kafka的HA機制。
Kafka為何需要High Available
為何需要Replication
在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個Broker當機,則其上所有的Partition資料都不可被消費,這與Kafka資料持久性及Delivery Guarantee的設計目标相悖。同時Producer都不能再将資料存于這些Partition中。
- 如果Producer使用同步模式則Producer會在嘗試重新發送
message.send.max.retries
- (預設值為3)次後抛出Exception,使用者可以選擇停止發送後續資料也可選擇繼續選擇發送。而前者會造成資料的阻塞,後者會造成本應發往該Broker的資料的丢失。
- 如果Producer使用異步模式,則Producer會嘗試重新發送
message.send.max.retries
- (預設值為3)次後記錄該異常并繼續發送後續資料,這會造成資料丢失并且使用者隻能通過日志發現該問題。
由此可見,在沒有Replication的情況下,一旦某機器當機或者某個Broker停止工作則會造成整個系統的可用性降低。随着叢集規模的增加,整個叢集中出現該類異常的幾率大大增加,是以對于生産系統而言Replication機制的引入非常重要。
為何需要Leader Election
(本文所述Leader Election主要指Replica之間的Leader Election)
引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replica中選出一個Leader,Producer和Consumer隻與這個Leader互動,其它Replica作為Follower從Leader中複制資料。
因為需要保證同一個Partition的多個Replica之間的資料一緻性(其中一個當機後其它Replica必須要能繼續服務并且即不能造成資料重複也不能造成資料丢失)。如果沒有一個Leader,所有Replica都可同時讀/寫資料,那就需要保證多個Replica之間互相(N×N條通路)同步資料,資料的一緻性和有序性非常難保證,大大增加了Replication實作的複雜性,同時也增加了出現異常的幾率。而引入Leader後,隻有Leader負責資料讀寫,Follower隻向Leader順序Fetch資料(N條通路),系統更加簡單且高效。
Kafka HA設計解析
如何将所有Replica均勻分布到整個叢集
為了更好的做負載均衡,Kafka盡量将所有的Partition均勻配置設定到整個叢集上。一個典型的部署方式是一個Topic的Partition數量大于Broker的數量。同時為了提高Kafka的容錯能力,也需要将同一個Partition的Replica盡量分散到不同的機器。實際上,如果所有的Replica都在同一個Broker上,那一旦該Broker當機,該Partition的所有Replica都無法工作,也就達不到HA的效果。同時,如果某個Broker當機了,需要保證它上面的負載可以被均勻的配置設定到其它幸存的所有Broker上。
Kafka配置設定Replica的算法如下:
- 将所有Broker(假設共n個Broker)和待配置設定的Partition排序
- 将第i個Partition配置設定到第(i mod n)個Broker上
- 将第i個Partition的第j個Replica配置設定到第((i + j) mod n)個Broker上
Data Replication
Kafka的Data Replication需要解決如下問題:
- 怎樣Propagate消息
- 在向Producer發送ACK前需要保證有多少個Replica已經收到該消息
- 怎樣處理某個Replica不工作的情況
- 怎樣處理Failed Replica恢複回來的情況
Propagate消息
Producer在釋出消息到某個Partition時,先通過Zookeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer隻将該消息發送到該Partition的Leader。Leader會将該消息寫入其本地Log。每個Follower都從Leader pull資料。這種方式上,Follower存儲的資料順序與Leader保持一緻。Follower在收到該消息并寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader将增加HW并且向Producer發送ACK。
為了提高性能,每個Follower在接收到資料後就立馬向Leader發送ACK,而非等到資料寫入Log中。是以,對于已經commit的消息,Kafka隻能保證它被存于多個Replica的記憶體中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生後該條消息一定能被Consumer消費。但考慮到這種場景非常少見,可以認為這種方式在性能和資料持久化上做了一個比較好的平衡。在将來的版本中,Kafka會考慮提供更高的持久性。
Consumer讀消息也是從Leader讀取,隻有被commit過的消息(offset低于HW的消息)才會暴露給Consumer。
Kafka Replication的資料流如下圖所示
ACK前需要保證有多少個備份
和大部分分布式系統一樣,Kafka處理失敗需要明确定義一個Broker是否“活着”。對于Kafka而言,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個通過Zookeeper的Heartbeat機制來實作)。二是Follower必須能夠及時将Leader的消息複制過來,不能“落後太多”。
Leader會跟蹤與其保持同步的Replica清單,該清單稱為ISR(即in-sync Replica)。如果一個Follower當機,或者落後太多,Leader将把它從ISR中移除。這裡所描述的“落後太多”指Follower複制的消息落後于Leader後的條數超過預定值(該值可在$KAFKA_HOME/config/server.properties中通過
replica.lag.max.messages
配置,其預設值是4000)或者Follower超過一定時間(該值可在$KAFKA_HOME/config/server.properties中通過
replica.lag.time.max.ms
來配置,其預設值是10000)未向Leader發送fetch請求。。
Kafka的複制機制既不是完全的同步複制,也不是單純的異步複制。事實上,同步複制要求所有能工作的Follower都複制完,這條消息才會被認為commit,這種複制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步複制方式下,Follower異步的從Leader複制資料,資料隻要被Leader寫入log就被認為已經commit,這種情況下如果Follower都複制完都落後于Leader,而如果Leader突然當機,則會丢失資料。而Kafka的這種使用ISR的方式則很好的均衡了確定資料不丢失以及吞吐率。Follower可以批量的從Leader複制資料,這樣極大的提高複制性能(批量寫磁盤),極大減少了Follower與Leader的差距。
需要說明的是,Kafka隻解決fail/recover,不處理“Byzantine”(“拜占庭”)問題。一條消息隻有被ISR裡的所有Follower都從Leader複制過去才會被認為已送出。這樣就避免了部分資料被寫進了Leader,還沒來得及被任何Follower複制就當機了,而造成資料丢失(Consumer無法消費這些資料)。而對于Producer而言,它可以選擇是否等待消息commit,這可以通過
request.required.acks
來設定。這種機制確定了隻要ISR有一個或以上的Follower,一條被commit的消息就不會丢失。
Leader Election算法
上文說明了Kafka是如何做Replication的,另外一個很重要的問題是當Leader當機了,怎樣在Follower中選舉出新的Leader。因為Follower可能落後許多或者crash了,是以必須確定選擇“最新”的Follower作為新的Leader。一個基本的原則就是,如果Leader不在了,新的Leader必須擁有原來的Leader commit過的所有消息。這就需要作一個折衷,如果Leader在标明一條消息被commit前等待更多的Follower确認,那在它當機之後就有更多的Follower可以作為新的Leader,但這也會造成吞吐率的下降。
一種非常常用的Leader Election的方式是“Majority Vote”(“少數服從多數”),但Kafka并未采用這種方式。這種模式下,如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica複制完消息,為了保證正确選出新的Leader,fail的Replica不能超過f個。因為在剩下的任意f+1個Replica裡,至少有一個Replica包含有最新的所有消息。這種方式有個很大的優勢,系統的latency隻取決于最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,為了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。如果要容忍1個follower挂掉,必須要有3個以上的Replica,如果要容忍2個Follower挂掉,必須要有5個以上的Replica。也就是說,在生産環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大資料量下導緻性能的急劇下降。這就是這種算法更多用在Zookeeper這種共享叢集配置的系統中而很少在需要存儲大量資料的系統中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的資料存儲并沒有使用這種方式。
實際上,Leader Election算法非常多,比如Zookeeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的Leader Election算法更像微軟的PacificA算法。
Kafka在Zookeeper中動态維護了一個ISR(in-sync replicas),這個ISR裡的所有Replica都跟上了leader,隻有ISR裡的成員才有被選為Leader的可能。在這種模式下,對于f+1個Replica,一個Partition能在保證不丢失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是非常有利的。事實上,為了容忍f個Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數量是一樣的,但是ISR需要的總的Replica的個數幾乎是Majority Vote的一半。
雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優勢,但是Kafka作者認為Kafka可以通過Producer選擇是否被commit阻塞來改善這一問題,并且節省下來的Replica和磁盤使得ISR模式仍然值得。
如何處理所有Replica都不工作
上文提到,在ISR中至少有一個follower時,Kafka可以確定已經commit的資料不丢失,但如果某個Partition的所有Replica都當機了,就無法保證資料不丢失了。這種情況下有兩種可行的方案:
- 等待ISR中的任一個Replica“活”過來,并且選它作為Leader
- 選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader
這就需要在可用性和一緻性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者資料都丢失了,這個Partition将永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它并不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的資料源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以後的版本中,Kafka支援使用者通過配置選擇這兩種方式中的一種,進而根據不同的使用場景選擇高可用性還是強一緻性。
如何選舉Leader
最簡單最直覺的方案是,所有Follower都在Zookeeper上設定一個Watch,一旦Leader當機,其對應的ephemeral znode會自動删除,此時所有Follower都嘗試建立該節點,而建立成功者(Zookeeper保證隻有一個能建立成功)即是新的Leader,其它Replica即為Follower。
但是該方法會有3個問題:
- split-brain 這是由Zookeeper的特性引起的,雖然Zookeeper能保證所有Watch按順序觸發,但并不能保證同一時刻所有Replica“看”到的狀态是一樣的,這就可能造成不同Replica的響應不一緻
- herd effect 如果當機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成叢集内大量的調整
- Zookeeper負載過重 每個Replica都要為此在Zookeeper上注冊一個Watch,當叢集規模增加到幾千個Partition時Zookeeper負載會過重。
Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會将Leader的改變直接通過RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應的Broker。同時controller也負責增删Topic以及Replica的重新配置設定。
HA相關Zookeeper結構
(本節所示Zookeeper結構中,實線框代表路徑名是固定的,而虛線框代表路徑名與業務相關)
admin (該目錄下znode隻有在有相關操作時才會存在,操作結束時會将其删除)
/admin/preferred_replica_election
資料結構
Schema:
{
"fields":[
{
"name":"version",
"type":"int",
"doc":"version id"
},
{
"name":"partitions",
"type":{
"type":"array",
"items":{
"fields":[
{
"name":"topic",
"type":"string",
"doc":"topic of the partition for which preferred replica election should be triggered"
},
{
"name":"partition",
"type":"int",
"doc":"the partition for which preferred replica election should be triggered"
}
],
}
"doc":"an array of partitions for which preferred replica election should be triggered"
}
}
]
}
Example:
{
"version": 1,
"partitions":
[
{
"topic": "topic1",
"partition": 8
},
{
"topic": "topic2",
"partition": 16
}
]
}
/admin/reassign_partitions
用于将一些Partition配置設定到不同的broker集合上。對于每個待重新配置設定的Partition,Kafka會在該znode上存儲其所有的Replica和相應的Broker id。該znode由管理程序建立并且一旦重新配置設定成功它将會被自動移除。其資料結構如下
Schema:
{
"fields":[
{
"name":"version",
"type":"int",
"doc":"version id"
},
{
"name":"partitions",
"type":{
"type":"array",
"items":{
"fields":[
{
"name":"topic",
"type":"string",
"doc":"topic of the partition to be reassigned"
},
{
"name":"partition",
"type":"int",
"doc":"the partition to be reassigned"
},
{
"name":"replicas",
"type":"array",
"items":"int",
"doc":"a list of replica ids"
}
],
}
"doc":"an array of partitions to be reassigned to new replicas"
}
}
]
}
Example:
{
"version": 1,
"partitions":
[
{
"topic": "topic3",
"partition": 1,
"replicas": [1, 2, 3]
}
]
}
/admin/delete_topics
資料結構
Schema:
{ "fields":
[ {"name": "version", "type": "int", "doc": "version id"},
{"name": "topics",
"type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
} ]
}
Example:
{
"version": 1,
"topics": ["topic4", "topic5"]
}
brokers
/brokers/ids/[brokerId]
)存儲“活着”的Broker資訊。資料結構如下
Schema:
{ "fields":
[ {"name": "version", "type": "int", "doc": "version id"},
{"name": "host", "type": "string", "doc": "ip address or host name of the broker"},
{"name": "port", "type": "int", "doc": "port of the broker"},
{"name": "jmx_port", "type": "int", "doc": "port for jmx"}
]
}
Example:
{
"jmx_port":-1,
"host":"node1",
"version":1,
"port":9092
}
/brokers/topics/[topic]
),存儲該Topic的所有Partition的所有Replica所在的Broker id,第一個Replica即為Preferred Replica,對一個給定的Partition,它在同一個Broker上最多隻有一個Replica,是以Broker id可作為Replica id。資料結構如下
Schema:
{ "fields" :
[ {"name": "version", "type": "int", "doc": "version id"},
{"name": "partitions",
"type": {"type": "map",
"values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
"doc": "a map from partition id to replica list"},
}
]
}
Example:
{
"version":1,
"partitions":
{"12":[6],
"8":[2],
"4":[6],
"11":[5],
"9":[3],
"5":[7],
"10":[4],
"6":[8],
"1":[3],
"0":[2],
"2":[4],
"7":[1],
"3":[5]}
}
/brokers/topics/[topic]/partitions/[partitionId]/state
) 結構如下
Schema:
{ "fields":
[ {"name": "version", "type": "int", "doc": "version id"},
{"name": "isr",
"type": {"type": "array",
"items": "int",
"doc": "an array of the id of replicas in isr"}
},
{"name": "leader", "type": "int", "doc": "id of the leader replica"},
{"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"},
{"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"}
]
}
Example:
{
"controller_epoch":29,
"leader":2,
"version":1,
"leader_epoch":48,
"isr":[2]
}
controller
/controller -> int (broker id of the controller)
存儲目前controller的資訊
Schema:
{ "fields":
[ {"name": "version", "type": "int", "doc": "version id"},
{"name": "brokerid", "type": "int", "doc": "broker id of the controller"}
]
}
Example:
{
"version":1,
"brokerid":8
}
/controller_epoch -> int (epoch)
直接以整數形式存儲controller epoch,而非像其它znode一樣以JSON字元串形式存儲。
broker failover過程簡介
- Controller在Zookeeper注冊Watch,一旦有Broker當機(這是用當機代表任何讓系統認為其die的情景,包括但不限于機器斷電,網絡不可用,GC導緻的Stop The World,程序crash等),其在Zookeeper對應的znode會自動被删除,Zookeeper會fire Controller注冊的watch,Controller讀取最新的幸存的Broker
- Controller決定set_p,該集合包含了當機的所有Broker上的所有Partition
-
對set_p中的每一個Partition
3.1 從
/brokers/topics/[topic]/partitions/[partition]/state
-
讀取該Partition目前的ISR
3.2 決定該Partition的新Leader。如果目前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含目前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的資料丢失)。如果該Partition的所有Replica都當機了,則将新的Leader設定為-1。
3.3 将新的Leader,ISR和新的
leader_epoch
- 及
controller_epoch
- 寫入
/brokers/topics/[topic]/partitions/[partition]/state
- 。注意,該操作隻有其version在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1
-
直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest指令。Controller可以在一個RPC操作中發送多個指令進而提高效率。
Broker failover順序圖如下所示。