kafka apis反映出kafka broker server可以提供哪些服務,
broker server主要和producer,consumer,controller有互動,搞清這些api就清楚了broker server的所有行為
提供對offset的查詢的需求,比如查詢earliest,latest offset是什麼,或before某個時間戳的offset是什麼
可以看到,當沒有找到topic->partition, 或partition leader,或其他異常的時候,就會導緻傳回offsets為nil
這樣在用戶端,經常通過擷取latestoffset來算spoutlag,會出現負值的情況
然後,fetchoffset調用fetchoffsetsbefore,來完成offset的擷取,
這個用于處理producer的請求,其實就是寫資料
名字有些tricky,和offsetcommit有什麼關系,因為對于kafka的highlevel consumer,consumeroffset是被寫入kafka topic的,是以offsetcommitrequest其實就是一種特殊的producer request
你看他實際也是,用producerrequestfromoffsetcommit,将它轉換成了producer request
主要調用appendtolocallog,核心邏輯
partition.appendmessagestoleader
對于producer的寫政策,取決于配置的acker機制,
acks = 0,那沒有failover處理的,發就發了
acks = 1,當寫leader replica成功後就傳回,其他的replica都是通過fetcher去同步的,是以kafka是異步寫
不過有資料丢失的風險,如果leader的資料沒有來得及同步,leader挂了,那麼會丢失資料
acks = –1, 要等待所有的replicas都成功後,才能傳回
是以這裡需要産生delayedproducerrequest,這個request隻有在所有的follower都fetch成功後才能reponse
是以delayedproducerrequest會在fetch request中被觸發unblock
響應讀資料的請求,來自consumer或follower fetcher
readmessagesets其實就是對每個topicandpartititon調用readmessageset
如果是follower fetch request,需要做recordfollowerlogendoffsets更新follower的leo,
最終調用到replicamanager.updatereplicaleoandpartitionhw,并修正改partition的isr
最終調到partition.updateleaderhwandmaybeexpandisr來更新isr
maybeincrementleaderhw
響應broker發來的shutdown請求,
單純的調用,controller.shutdownbroker,這種是優雅的shutdown,會做很多的準備工作
這裡做leader重新選舉用的是controlledshutdownpartitionleaderselector
這個選舉政策很簡單,
排除了shuttingdownbroker的産生新的isr,然後選擇head作為新的leader
就是處理讀取和更新metadatacache的請求,
可見cache為,map[string, mutable.map[int, partitionstateinfo],記錄每個topic,每個partition的partitionstateinfo
包含,leaderisrandcontrollerepoch,記錄leader和isr
allreplicas記錄所有的replicas,即ar,注意這裡隻會記錄replica id,replica的具體情況,隻會在replicamanager裡面記錄
這裡為每個partition記錄leaderisrandcontrollerepoch,是不是有點浪費
而alivebrokers,記錄所有活的brokers的id和ip:port
是以也比較簡單,這個cache在每個brokers之間是會被異步更新的,通過handleupdatemetadatarequest
停止replica請求,一般是當broker stop或需要删除某replica時被調用
處理很簡單,主要就是停止fetcher線程,并删除partition目錄
stopreplicas
stopreplica,注意很多情況下是不需要真正删除replica的,比如當機
處理leaderandisr的更新,這個和handleupdatemetadatarequest的差別是,不光更新cache,需要真正去做replica的leader切換
主要調用,
replicamanager.becomeleaderorfollower(leaderandisrrequest, offsetmanager)
核心邏輯如下,前面那段主要是判斷這個request是否有效,根據controllerepoch和leaderepoch
replicamanager裡面有個allpartitions,記錄所有partition的情況,
其中partition結構中,比較主要的資料是,
這個記錄brokerid和replica的對應關系
是以getorcreatepartition,隻是get目前replicamanager裡面儲存的該partiiton的情況
replicamanager.makeleaders
關閉所有成為leader的replica對應的fetcher,然後關鍵是調用,
上面提到case (partition, partitionstateinfo)中,partition是replicamanager目前的情況,而partitionstateinfo中間放的是request的新的配置設定情況,
這裡還有個函數getorcreatereplica,知道兩點,
a. 在這裡當local replica不存在的時候,會真正的建立replica
b. 所有生成replica都是用這個函數,是以其他的replica list都是assignedreplicamap中replica的引用,比如insyncreplicas
replicamanager.makefollowers
replicafetchermanager.addfetcherforpartitions(partitionstomakefollowerwithleaderandoffset) //增加新的fetcher
}
partition.makefollower
比較簡單,隻是更新assignedreplicas和isr