天天看點

Apache Kafka源碼分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服務,

broker server主要和producer,consumer,controller有互動,搞清這些api就清楚了broker server的所有行為

提供對offset的查詢的需求,比如查詢earliest,latest offset是什麼,或before某個時間戳的offset是什麼

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

可以看到,當沒有找到topic->partition, 或partition leader,或其他異常的時候,就會導緻傳回offsets為nil

這樣在用戶端,經常通過擷取latestoffset來算spoutlag,會出現負值的情況

然後,fetchoffset調用fetchoffsetsbefore,來完成offset的擷取,

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

這個用于處理producer的請求,其實就是寫資料

名字有些tricky,和offsetcommit有什麼關系,因為對于kafka的highlevel consumer,consumeroffset是被寫入kafka topic的,是以offsetcommitrequest其實就是一種特殊的producer request

你看他實際也是,用producerrequestfromoffsetcommit,将它轉換成了producer request

主要調用appendtolocallog,核心邏輯

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

partition.appendmessagestoleader

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

對于producer的寫政策,取決于配置的acker機制,

acks = 0,那沒有failover處理的,發就發了

acks = 1,當寫leader replica成功後就傳回,其他的replica都是通過fetcher去同步的,是以kafka是異步寫

不過有資料丢失的風險,如果leader的資料沒有來得及同步,leader挂了,那麼會丢失資料

acks = –1, 要等待所有的replicas都成功後,才能傳回

是以這裡需要産生delayedproducerrequest,這個request隻有在所有的follower都fetch成功後才能reponse

是以delayedproducerrequest會在fetch request中被觸發unblock

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

響應讀資料的請求,來自consumer或follower fetcher

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

readmessagesets其實就是對每個topicandpartititon調用readmessageset

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

如果是follower fetch request,需要做recordfollowerlogendoffsets更新follower的leo,

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

最終調用到replicamanager.updatereplicaleoandpartitionhw,并修正改partition的isr

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

最終調到partition.updateleaderhwandmaybeexpandisr來更新isr

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

maybeincrementleaderhw

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

響應broker發來的shutdown請求,

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

單純的調用,controller.shutdownbroker,這種是優雅的shutdown,會做很多的準備工作

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

這裡做leader重新選舉用的是controlledshutdownpartitionleaderselector

這個選舉政策很簡單,

排除了shuttingdownbroker的産生新的isr,然後選擇head作為新的leader

就是處理讀取和更新metadatacache的請求,

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

可見cache為,map[string, mutable.map[int, partitionstateinfo],記錄每個topic,每個partition的partitionstateinfo

包含,leaderisrandcontrollerepoch,記錄leader和isr

allreplicas記錄所有的replicas,即ar,注意這裡隻會記錄replica id,replica的具體情況,隻會在replicamanager裡面記錄

這裡為每個partition記錄leaderisrandcontrollerepoch,是不是有點浪費

而alivebrokers,記錄所有活的brokers的id和ip:port

是以也比較簡單,這個cache在每個brokers之間是會被異步更新的,通過handleupdatemetadatarequest

停止replica請求,一般是當broker stop或需要删除某replica時被調用

處理很簡單,主要就是停止fetcher線程,并删除partition目錄

stopreplicas

stopreplica,注意很多情況下是不需要真正删除replica的,比如當機

處理leaderandisr的更新,這個和handleupdatemetadatarequest的差別是,不光更新cache,需要真正去做replica的leader切換

主要調用,

replicamanager.becomeleaderorfollower(leaderandisrrequest, offsetmanager)

核心邏輯如下,前面那段主要是判斷這個request是否有效,根據controllerepoch和leaderepoch

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

replicamanager裡面有個allpartitions,記錄所有partition的情況,

其中partition結構中,比較主要的資料是,

這個記錄brokerid和replica的對應關系

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

是以getorcreatepartition,隻是get目前replicamanager裡面儲存的該partiiton的情況

replicamanager.makeleaders

關閉所有成為leader的replica對應的fetcher,然後關鍵是調用,

上面提到case (partition, partitionstateinfo)中,partition是replicamanager目前的情況,而partitionstateinfo中間放的是request的新的配置設定情況,

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

這裡還有個函數getorcreatereplica,知道兩點,

a. 在這裡當local replica不存在的時候,會真正的建立replica

b. 所有生成replica都是用這個函數,是以其他的replica list都是assignedreplicamap中replica的引用,比如insyncreplicas

Apache Kafka源碼分析 - KafkaApis
Apache Kafka源碼分析 - KafkaApis

replicamanager.makefollowers

Apache Kafka源碼分析 - KafkaApis

    replicafetchermanager.addfetcherforpartitions(partitionstomakefollowerwithleaderandoffset) //增加新的fetcher

}

Apache Kafka源碼分析 - KafkaApis

partition.makefollower

比較簡單,隻是更新assignedreplicas和isr

Apache Kafka源碼分析 - KafkaApis