天天看點

Apache Kafka源碼分析 - PartitionStateMachine

startup

initializepartitionstate

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

這裡注意offlinepartition和newpartition的差別, 

如果controllercontext.partitionleadershipinfo中沒有這個partition的leader資訊,那麼說明是newpartition 

如果有leader,但leader所在broker不是alive的,那麼就是offlinepartition 

當然,如果leader所在broker是alive的,那麼就是onlinepartition

triggeronlinepartitionstatechange

試圖将所有offline和new partition的狀态變成online

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

這裡看到,brokerrequestbatch,這個經常出現,controllerbrokerrequestbatch 

這個類封裝了leaderandisrrequestmap,stopreplicarequestmap,updatemetadatarequestmap 

用來記錄和cache,handlestatechange中産生的這些request 

最終用sendrequeststobrokers,将這些requests,批量的發出去

handlestatechange的邏輯後面單獨看,這裡看看controller.offlinepartitionselector,這個selector實作如何為一個newpartition或offlinepartition選一個leader 

代碼挺長的,注釋講的挺清楚的,就不貼代碼了 

首先如果isr裡面有活的broker,那沒有好說的,直接用它作為新的leader 

如果沒有,這裡需要看一下是否容忍unclean leader election,其實就是是否可以容忍丢資料,如果可以 

那麼就看看這ar裡面有沒有活的broker,如果有就以它為leader,但這個既然不在isr裡面,說明這個replica是不同步的,是以一定有data loss 

如果ar裡面也沒有活的broker,那隻能是elect失敗了

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

registerlisteners

在oncontrollerfailover中被調用, 

這裡負責注冊一下listener到zk,deletetopiclistener先不管 

先看看topicchangelistener,當topics發生變化時,我們做什麼處理?

registertopicchangelistener

listen這個目錄, /brokers/topics,如果發生變化,觸發topicchangelistener

topicchangelistener

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

onnewtopiccreation

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

addpartitionslistener

和topic listener很想,就是從zk讀出partition情況,和目前context裡面的比較,找出新的partitions,調用

可見無論是topicchangelistener還是addpartitionslistener,最終都是調用到onnewpartitioncreation,畢竟topic是個邏輯概念

onnewpartitioncreation

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

很簡單,隻是首先将所有新的partition和相應的replica的狀态設為new,然後再設為online

handlestatechange

這是狀态機的主邏輯,

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

可以看到,對于轉變到offlinepartition,nonexistentpartition,隻是單純的設定state 

而轉變到newpartition,除了設定state,也就多了步初始化ar

隻有轉變到onlinepartition的時候比較複雜些,

其中從newpartition--》onlinepartition,需要做些初始化的工作,是以調用initializeleaderandisrforpartition 

initializeleaderandisrforpartition

newpartition是在zk中,沒有leaderandisr path的,是以初始化需要建立path,建立後,就再也不能回到new的狀态,隻能到offline

其中邏輯除了建立zk path,就是進行leader elect,這裡的elect邏輯是寫死的,初始化的時候,一定是prefered selector,即選取live ar的head

Apache Kafka源碼分析 - PartitionStateMachine
Apache Kafka源碼分析 - PartitionStateMachine

offlinepartition或onlinepartition –》onlinepartition

這個相對簡單,隻需要重新選舉一下leader

electleaderforpartition

Apache Kafka源碼分析 - PartitionStateMachine