一 Kafka HA設計解析
1.1 為何需要Replication
在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個Broker當機,則其上所有的Partition資料都不可被消費,這與Kafka資料持久性及Delivery Guarantee的設計目标相悖。同時Producer都不能再将資料存于這些Partition中。
如果Producer使用同步模式則Producer會在嘗試重新發送message.send.max.retries(預設值為3)次後抛出Exception,使用者可以選擇停止發送後續資料也可選擇繼續選擇發送。而前者會造成資料的阻塞,後者會造成本應發往該Broker的資料的丢失。
如果Producer使用異步模式,則Producer會嘗試重新發送message.send.max.retries(預設值為3)次後記錄該異常并繼續發送後續資料,這會造成資料丢失并且使用者隻能通過日志發現該問題。同時,Kafka的Producer并未對異步模式提供callback接口。
由此可見,在沒有Replication的情況下,一旦某機器當機或者某個Broker停止工作則會造成整個系統的可用性降低。随着叢集規模的增加,整個叢集中出現該類異常的幾率大大增加,是以對于生産系統而言Replication機制的引入非常重要。
1.2 Leader Election
引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer隻與這個Leader互動,其它Replica作為Follower從Leader中複制資料。
因為需要保證同一個Partition的多個Replica之間的資料一緻性(其中一個當機後其它Replica必須要能繼續服務并且即不能造成資料重複也不能造成資料丢失)。如果沒有一個Leader,所有Replica都可同時讀/寫資料,那就需要保證多個Replica之間互相(N×N條通路)同步資料,資料的一緻性和有序性非常難保證,大大增加了Replication實作的複雜性,同時也增加了出現異常的幾率。而引入Leader後,隻有Leader負責資料讀寫,Follower隻向Leader順序Fetch資料(N條通路),系統更加簡單且高效。
1.3 如何将所有Replica均勻分布到整個叢集
為了更好的做負載均衡,Kafka盡量将所有的Partition均勻配置設定到整個叢集上。一個典型的部署方式是一個Topic的Partition數量大于Broker的數量。同時為了提高Kafka的容錯能力,也需要将同一個Partition的Replica盡量分散到不同的機器。實際上,如果所有的Replica都在同一個Broker上,那一旦該Broker當機,該Partition的所有Replica都無法工作,也就達不到HA的效果。同時,如果某個Broker當機了,需要保證它上面的負載可以被均勻的配置設定到其它幸存的所有Broker上。
Kafka配置設定Replica的算法如下:
1.将所有Broker(假設共n個Broker)和待配置設定的Partition排序
2.将第i個Partition配置設定到第(i mod n)個Broker上
3.将第i個Partition的第j個Replica配置設定到第((i + j) mode n)個Broker上
1.4 Data Replication(副本政策)
Kafka的高可靠性的保障來源于其健壯的副本(replication)政策。
1.4.1 消息傳遞同步政策
Producer在釋出消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少,Producer隻将該消息發送到該Partition的Leader。Leader會将該消息寫入其本地Log。每個Follower都從Leader pull資料。這種方式上,Follower存儲的資料順序與Leader保持一緻。Follower在收到該消息并寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader将增加HW并且向Producer發送ACK。
為了提高性能,每個Follower在接收到資料後就立馬向Leader發送ACK,而非等到資料寫入Log中。是以,對于已經commit的消息,Kafka隻能保證它被存于多個Replica的記憶體中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生後該條消息一定能被Consumer消費。
Consumer讀消息也是從Leader讀取,隻有被commit過的消息才會暴露給Consumer。
Kafka Replication的資料流如下圖所示:

1.4.2 ACK前需要保證有多少個備份
對于Kafka而言,定義一個Broker是否“活着”包含兩個條件:
- 一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實作)。
- 二是Follower必須能夠及時将Leader的消息複制過來,不能“落後太多”。
Leader會跟蹤與其保持同步的Replica清單,該清單稱為ISR(即in-sync Replica)。如果一個Follower當機,或者落後太多,Leader将把它從ISR中移除。這裡所描述的“落後太多”指Follower複制的消息落後于Leader後的條數超過預定值(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.max.messages配置,其預設值是4000)或者Follower超過一定時間(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.time.max.ms來配置,其預設值是10000)未向Leader發送fetch請求。
Kafka的複制機制既不是完全的同步複制,也不是單純的異步複制。事實上,完全同步複制要求所有能工作的Follower都複制完,這條消息才會被認為commit,這種複制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步複制方式下,Follower異步的從Leader複制資料,資料隻要被Leader寫入log就被認為已經commit,這種情況下如果Follower都複制完都落後于Leader,而如果Leader突然當機,則會丢失資料。而Kafka的這種使用ISR的方式則很好的均衡了確定資料不丢失以及吞吐率。Follower可以批量的從Leader複制資料,這樣極大的提高複制性能(批量寫磁盤),極大減少了Follower與Leader的差距。
需要說明的是,Kafka隻解決fail/recover,不處理“Byzantine”(“拜占庭”)問題。一條消息隻有被ISR裡的所有Follower都從Leader複制過去才會被認為已送出。這樣就避免了部分資料被寫進了Leader,還沒來得及被任何Follower複制就當機了,而造成資料丢失(Consumer無法消費這些資料)。而對于Producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設定。這種機制確定了隻要ISR有一個或以上的Follower,一條被commit的消息就不會丢失。
1.4.3 Leader Election算法
Leader選舉本質上是一個分布式鎖,有兩種方式實作基于ZooKeeper的分布式鎖:
- 節點名稱唯一性:多個用戶端建立一個節點,隻有成功建立節點的用戶端才能獲得鎖
- 臨時順序節點:所有用戶端在某個目錄下建立自己的臨時順序節點,隻有序号最小的才獲得鎖
一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”),但Kafka并未采用這種方式。這種模式下,如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica複制完消息,為了保證正确選出新的Leader,fail的Replica不能超過f個。因為在剩下的任意f+1個Replica裡,至少有一個Replica包含有最新的所有消息。這種方式有個很大的優勢,系統的latency隻取決于最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,為了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。如果要容忍1個follower挂掉,必須要有3個以上的Replica,如果要容忍2個Follower挂掉,必須要有5個以上的Replica。也就是說,在生産環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大資料量下導緻性能的急劇下降。這就是這種算法更多用在ZooKeeper這種共享叢集配置的系統中而很少在需要存儲大量資料的系統中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的資料存儲并沒有使用這種方式。
Kafka在ZooKeeper中動态維護了一個ISR(in-sync replicas),這個ISR裡的所有Replica都跟上了leader,隻有ISR裡的成員才有被選為Leader的可能。在這種模式下,對于f+1個Replica,一個Partition能在保證不丢失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是非常有利的。事實上,為了容忍f個Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數量是一樣的,但是ISR需要的總的Replica的個數幾乎是Majority Vote的一半。
雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優勢,但是Kafka作者認為Kafka可以通過Producer選擇是否被commit阻塞來改善這一問題,并且節省下來的Replica和磁盤使得ISR模式仍然值得。
1.4.4 如何處理所有Replica都不工作
在ISR中至少有一個follower時,Kafka可以確定已經commit的資料不丢失,但如果某個Partition的所有Replica都當機了,就無法保證資料不丢失了。這種情況下有兩種可行的方案:
1.等待ISR中的任一個Replica“活”過來,并且選它作為Leader
2.選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader
這就需要在可用性和一緻性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者資料都丢失了,這個Partition将永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它并不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的資料源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以後的版本中,Kafka支援使用者通過配置選擇這兩種方式中的一種,進而根據不同的使用場景選擇高可用性還是強一緻性。
1.4.5 選舉Leader
最簡單最直覺的方案是,所有Follower都在ZooKeeper上設定一個Watch,一旦Leader當機,其對應的ephemeral znode會自動删除,此時所有Follower都嘗試建立該節點,而建立成功者(ZooKeeper保證隻有一個能建立成功)即是新的Leader,其它Replica即為Follower。
但是該方法會有3個問題:
1.split-brain 這是由ZooKeeper的特性引起的,雖然ZooKeeper能保證所有Watch按順序觸發,但并不能保證同一時刻所有Replica“看”到的狀态是一樣的,這就可能造成不同Replica的響應不一緻
2.herd effect 如果當機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成叢集内大量的調整
3.ZooKeeper負載過重 每個Replica都要為此在ZooKeeper上注冊一個Watch,當叢集規模增加到幾千個Partition時ZooKeeper負載會過重。
Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會将Leader的改變直接通過RPC的方式(比ZooKeeper Queue的方式更高效)通知需為為此作為響應的Broker。同時controller也負責增删Topic以及Replica的重新配置設定。
二 Kafka生産過程分析
2.1 寫入方式
producer采用推(push)模式将消息釋出到broker,每條消息都被追加(append)到分區(patition)中,屬于順序寫磁盤(順序寫磁盤效率比随機寫記憶體要高,保障kafka吞吐率)。
2.2 分區(Partition)
Kafka叢集有多個消息代理伺服器(broker-server)組成,釋出到Kafka叢集的每條消息都有一個類别,用主題(topic)來表示。通常,不同應用産生不同類型的資料,可以設定不同的主題。一個主題一般會有多個消息的訂閱者,當生産者釋出消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。
Kafka叢集為每個主題維護了分布式的分區(partition)日志檔案,實體意義上可以把主題(topic)看作進行了分區的日志檔案(partition log)。主題的每個分區都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區中的每條消息都會按照時間順序配置設定到一個單調遞增的順序編号,叫做偏移量(offset),這個偏移量能夠唯一地定位目前分區中的每一條消息。
消息發送時都被發送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:
下圖中的topic有3個分區,每個分區的偏移量都從0開始,不同分區之間的偏移量都是獨立的,不會互相影響。
我們可以看到,每個Partition中的消息都是有序的,生産的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
釋出到Kafka主題的每條消息包括鍵值和時間戳。消息到達伺服器端的指定分區後,都會配置設定到一個自增的偏移量。原始的消息内容和配置設定的偏移量以及其他一些中繼資料資訊最後都會存儲到分區日志檔案中。消息的鍵也可以不用設定,這種情況下消息會均衡地分布到不同的分區。
1) 分區的原因
(1)友善在叢集中擴充,每個Partition可以通過調整以适應它所在的機器,而一個topic又可以有多個Partition組成,是以整個叢集就可以适應任意大小的資料了;
(2)可以提高并發,因為可以以Partition為機關讀寫了。
傳統消息系統在服務端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發送給消費者。但由于消息是異步發送給消費者的,消息到達消費者的順序可能是無序的,這就意味着在并行消費時,傳統消息系統無法很好地保證消息被順序處理。雖然我們可以設定一個專用的消費者隻消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執行。
Kafka比傳統消息系統有更強的順序性保證,它使用主題的分區作為消息處理的并行單元。Kafka以分區作為最小的粒度,将每個分區配置設定給消費者組中不同的而且是唯一的消費者,并確定一個分區隻屬于一個消費者,即這個消費者就是這個分區的唯一讀取線程。那麼,隻要分區的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區,不同的消費者處理不同的分區,是以Kafka不僅保證了消息的有序性,也做到了消費者的負載均衡。
2)分區的原則
(1)指定了patition,則直接使用;
(2)未指定patition但指定key,通過對key的value進行hash出一個patition
(3)patition和key都未指定,使用輪詢選出一個patition。
DefaultPartitioner類
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
2.3 副本(Replication)
同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 當機,其上所有 patition 的資料都不可被消費,同時producer也不能再将資料存于其上的patition。引入replication之後,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer隻與這個leader互動,其它replication作為follower從leader 中複制資料。
2.4 寫入流程
producer寫入消息流程如下:
1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
2)producer将消息發送給該leader
3)leader将消息寫入本地log
4)followers從leader pull消息,寫入本地log後向leader發送ACK
5)leader收到所有ISR中的replication的ACK後,增加HW(high watermark,最後commit 的offset)并向producer發送ACK
三 broker儲存消息
3.1 存儲方式
實體上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 實體上對應一個檔案夾(該檔案夾存儲該 patition 的所有消息和索引檔案),如下:
3.2 存儲政策
無論消息是否被消費,kafka 都會保留所有消息。有兩種政策可以删除舊資料:
1、 基于時間:log.retention.hours=168
2、 基于大小:log.retention.bytes=1073741824
需要注意的是,因為Kafka讀取特定消息的時間複雜度為O(1),即與檔案大小無關,是以這裡删除過期檔案與提高 Kafka 性能無關。
3.3Zookeeper存儲結構
admin
該目錄下znode隻有在有相關操作時才會存在,操作結束時會将其删除
/admin/reassign_partitions用于将一些Partition配置設定到不同的broker集合上。對于每個待重新配置設定的Partition,Kafka會在該znode上存儲其所有的Replica和相應的Broker id。該znode由管理程序建立并且一旦重新配置設定成功它将會被自動移除。
broker
即/brokers/ids/[brokerId])存儲“活着”的broker資訊。
topic注冊資訊(/brokers/topics/[topic]),存儲該topic的所有partition的所有replica所在的broker id,第一個replica即為preferred replica,對一個給定的partition,它在同一個broker上最多隻有一個replica,是以broker id可作為replica id。
controller
/controller -> int (broker id of the controller)存儲目前controller的資訊
/controller_epoch -> int (epoch)直接以整數形式存儲controller epoch,而非像其它znode一樣以JSON字元串形式存儲。
四 Kafka消費過程分析
kafka提供了兩套consumer API:進階Consumer API和低級API。
4.1消費模型
消息由生産者釋出到Kafka叢集後,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
基于推送模型(push)的消息系統,由消息代理記錄消費者的消費狀态。消息代理在将消息推送到消費者後,标記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發送出去後,當消費程序挂掉或者由于網絡原因沒有收到這條消息時,就有可能造成消息丢失(因為消息代理已經把這條消息标記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發送完消息後,要設定狀态為“已發送”,隻有收到消費者的确認請求後才更新為“已消費”,這就需要消息代理中記錄所有的消費狀态,這種做法顯然是不可取的。
Kafka采用拉取模型,由消費者自己記錄消費狀态,每個消費者互相獨立地順序讀取每個分區的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制,生産者最新寫入的消息如果還沒有達到備份數量,對消費者是不可見的。這種由消費者控制偏移量的優點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經消費過的消息;或者直接跳到最近的位置,從目前的時刻開始消費。
在一些消息系統中,消息代理會在消息被消費之後立即删除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要備援地存儲同一消息;或者等所有消費者都消費完才删除,這就需要消息代理跟蹤每個消費者的消費狀态,這種設計很大程度上限制了消息系統的整體吞吐量和處理延遲。Kafka的做法是生産者釋出的所有消息會一緻儲存在Kafka叢集中,不管消息有沒有被消費。使用者可以通過設定保留時間來清理過期的資料,比如,設定保留政策為兩天。那麼,在消息釋出之後,它可以被不同的消費者消費,在兩天之後,過期的消息就會自動清理掉。
4.2進階API
1)進階API優點
進階API 寫起來簡單
不需要自行去管理offset,系統通過zookeeper自行管理。
不需要管理分區,副本等情況,.系統自動管理。
消費者斷線會自動根據上一次記錄在zookeeper中的offset去接着擷取資料(預設設定1分鐘更新一下zookeeper中存的offset)
可以使用group來區分對同一個topic 的不同程式通路分離開來(不同的group記錄不同的offset,這樣不同程式讀取同一個topic才不會因為offset互相影響)
2)進階API缺點
不能自行控制offset(對于某些特殊需求來說)
不能細化控制如分區、副本、zk等
4.3低級API
1)低級 API 優點
能夠讓開發者自己控制offset,想從哪裡讀取就從哪裡讀取。
自行控制連接配接分區,對分區自定義進行負載均衡
對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在檔案或者記憶體中)
2)低級API缺點
太過複雜,需要自行控制offset,連接配接哪個分區,找到分區leader 等。
4.4消費者組
Kafka(三)Kafka的高可用與生産消費過程解析
消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區在同一時間隻能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分别讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。
在這種情況下,消費者可以通過水準擴充的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那麼其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區。
4.5 消費方式
consumer采用pull(拉)模式從broker中讀取資料。
push(推)模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以适當的速率消費消息。
對于Kafka而言,pull模式更合适,它可簡化broker的設計,consumer可自主要制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的送出方式進而實作不同的傳輸語義。
pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入循環中,一直等待資料到達。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待資料到達的“長輪詢”中進行阻塞(并且可選地等待到給定的位元組數,以確定大的傳輸大小)。
4.6消費者組案例
1)需求:測試同一個消費者組中的消費者,同一時刻隻能有一個消費者消費。
2)案例實操
(1)在node21、node22上修改/opt/module/kafka/config/consumer.properties配置檔案中的group.id屬性為任意組名。
[root@node22 config]$ vi consumer.properties
group.id=admin
(2)在node21、node22上分别啟動消費者
[root@node21 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181 --topic first --consumer.config config/consumer.properties
[root@node22 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181 --topic first --consumer.config config/consumer.properties
(3)在node23上啟動生産者
[root@node23 kafka]$ bin/kafka-console-producer.sh --broker-list node21:9092 --topic first
>hello world
(4)檢視node21和node22的接收者。
同一時刻隻有一個消費者接收到消息。
五 Topic的建立和删除
5.1 建立topic
建立 topic 的序列圖如下所示:
流程說明:
1、 controller 在 ZooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被建立,則 controller 會通過 watch 得到該 topic 的 partition/replica 配置設定。 2、 controller從 /brokers/ids 讀取目前所有可用的 broker 清單,對于 set_p 中的每一個 partition: 2.1、 從配置設定給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,并将AR設定為新的 ISR 2.2、 将新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 3、 controller 通過 RPC 向相關的 broker 發送 LeaderAndISRRequest。
5.2 删除topic
删除 topic 的序列圖如下所示:
1、 controller 在 zooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被删除,則 controller 會通過 watch 得到該 topic 的 partition/replica 配置設定。 2、 若 delete.topic.enable=false,結束;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調向對應的 broker 發送 StopReplicaRequest。
六 broker failover
kafka broker failover 序列圖如下所示:
1、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點注冊 Watcher,當 broker 當機時 zookeeper 會 fire watch 2、 controller 從 /brokers/ids 節點讀取可用broker 3、 controller決定set_p,該集合包含當機 broker 上的所有 partition 4、 對 set_p 中的每一個 partition 4.1、 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR 4.2、 決定新 leader 4.3、 将新 leader、ISR、controller_epoch 和 leader_epoch 等資訊寫入 state 節點 5、 通過 RPC 向相關 broker 發送 leaderAndISRRequest 指令
七 controller failover
當 controller 當機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節點注冊 watcher,當 controller 當機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試建立新的 controller path,隻有一個競選成功并當選為 controller。
當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成如下操作:
1、 讀取并增加 Controller Epoch。 2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher。 3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher。 4、 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。 5、 若 delete.topic.enable=true(預設值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher。 6、 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。 7、 初始化 ControllerContext 對象,設定目前所有 topic,“活”着的 broker 清單,所有 partition 的 leader 及 ISR等。 8、 啟動 replicaStateMachine 和 partitionStateMachine。 9、 将 brokerState 狀态設定為 RunningAsController。 10、 将每個 partition 的 Leadership 資訊發送給所有“活”着的 broker。 11、 若 auto.leader.rebalance.enable=true(預設值是true),則啟動 partition-rebalance 線程。 12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則删除相應的Topic。