最近有同學留言在使用Kafka的過程中遇到一些問題,比如在拉取的Topic中的資料時會抛出一些異常,今天筆者就為大家來分享一下Kafka的Fetch流程。
首先,我們來了解一下,Fetch Session的目标。Kafka在1.1.0以後的版本中優化了Fetch問題,引入了Fetch Session,Kafka由Broker來提供服務(通信、資料互動等)。每個分區會有一個Leader Broker,Broker會定期向Leader Broker發送Fetch請求,來擷取資料,而對于分區數較大的Topic來說,需要發出的Fetch請求就會很大。這樣會有一個問題:
Follower感興趣的分區集很少改變,然而每個FetchRequest必須枚舉Follower感興趣的所有分區集合;
當上一個FetchRequest隻會分區中沒有任何改變,仍然必須發回關于該分區的所有中繼資料,其中包括分區ID、分區的起始Offset、以及能夠請求的最大位元組數等。
并且,這些問題與系統中現存分區的數量成線性比例,例如,假設Kafka叢集中有100000個分區,其中大多數分區很少接收新消息。該系統中的Broker仍然會來回發送非常大的FetchRequest和FetchResponse,即使每秒添加的實際消息資料很少。随着分區數量的增長,Kafka使用越來越多的網絡帶寬來回傳遞這些消息。
當Kafka被調整為較低延遲時,這些低效會變得更嚴重。如果我們将每秒發送的FetchRequest數量增加一倍,我們應該期望在縮短的輪詢間隔内有更多的分區沒有改變。而且,我們無法在每個FetchRequest和FetchResponse中分攤每個分區發送中繼資料的所需要的帶寬資源,這将會導緻Kafka需要使用更多的網絡帶寬。
為了優化上述問題,Kafka增加了增量拉取分區的概念,進而減少用戶端每次拉取都需要拉取全部分區的問題。Fetch Session與網絡程式設計中的Session類似,可以認為它是有狀态的,這裡的狀态值的是知道它需要拉取哪些分區的資料,比如第一次拉取的分區0中的資料,後續分區0中沒有了資料,就不需要拉取分區0了,FetchSession資料結構如下
需要注意的是,Fetch Epoch是一個單調遞增的32位計數器,它在處理請求N之後,Broker會接收請求N+1,序列号總是大于0,在達到最大值後,它會回到1。
如果Fetch Session支援增量Fetch,那麼它将維護增量Fetch中每個分區的資訊,關于每個分區,它需要維護:
Topic名稱
分區ID
該分區的最大位元組數
Fetch偏移量
HighWaterMark
FetcherLogStartOffset
LeaderLogStartOffset
其中,Topic名稱、分區ID來自于TopicPartition,最大位元組數、Fetch偏移量、FetcherLogStartOffset來自于最近的FetcherRequest,HighWaterMark、LocalLogStartOffset來自于本地的Leader。因為Follower活着Consumer發出的請求都會與分區Leader進行互動,是以FetchSession也是記錄在Leader節點上的。
對于用戶端來說,什麼時候一個分區會被包含到增量的拉取請求中:
Client通知Broker,分區的maxBytes,fetchOffset,LogStartOffset改變了;
分區在之前的增量拉取會話中不存在,Client想要增加這個分區,進而來拉取新的分區;
分區在增量拉取會話中,Client要删除。
對于服務端來說,增量分區包含到增量的拉取響應中:
Broker通知Client分區的HighWaterMark或者brokerLogStartOffset改變了;
分區有新的資料
在Fetch.java類中,方法sendFetches(): prepareFetchRequests建立FetchSessionHandler.FetchRequestData。 建構拉取請求通過FetchSessionHandler.Builder,builder.add(partition, PartitionData)會添加next: 即要拉取的分區。建構時調用Builder.build(),針對Full進行拉取,代碼片段如下:
FetchSessionHandler.java
收到響應結果後,調用FetchSessionHandler.handleResponse()方法。 假如第一次是全量拉取,響應結果沒有出錯時,nextMetadata.isFull()仍然為true。 服務端建立了一個新的session(随機的唯一ID),用戶端的Fetch SessionId會設定為服務端傳回的sessionId, 并且epoch會增加1。這樣下次用戶端的拉取就不再是全量,而是增量了(toSend, toForget兩個集合容器,分别表示要拉取的和不需要拉取的)。 當服務端正常處理(這次不會生成新的session),用戶端也正常處理響應,則sessionId不會增加,但是epoch會增加1。
Broker處理拉取請求是,會建立不同類型的FetchContext,類型如下:
SessionErrorContext:拉取會話錯誤(例如,epoch不相等)
SessionlessFetchContext:不需要拉取會話
IncrementalFetchContext:增量拉取
FullFetchContext:全量拉取
在KafkaApis#handleFetchRequest()中,代碼片段如下:
因為Fetch Session使用的是Leader上的記憶體,是以我們需要限制在任何給定時間内的記憶體量,是以,每個Broker将隻建立有限數量的增量Fetch Session。以下,有兩個公共參數,用來配置Fetch Session的緩存:
max.incremental.fetch.session.cache.slots:用來限制每台Broker上最大Fetch Session數量,預設1000
min.incremental.fetch.session.eviction.ms:從緩存中逐漸增量擷取會話之前等待的最短時間,預設120000
這裡需要注意的時候,該屬性屬于read-only。Kafka Broker配置中有三種類型,它們分别是:
類型
說明
read-only
修改參數值後,需要重新開機Broker才能生效
per-broker
修改參數值後,隻會在對應的Broker上生效,不需要重新開機,屬于動态參數
cluster-wide
修改參數值後,整個叢集範圍内會生效,不需要重新開機,屬于動态參數
當伺服器收到建立增量Fetch Session請求時,它會将新的Session與先有的Session進行比較,隻有在下列情況下,新Session才會有效:
新Session在Follower裡面;
現有Session已停止,且超過最小等待時間;
現有Session已停止,且超過最小等待時間,并且新Session有更多的分區。
這樣可以實作如下目标:
Follower優先級高于消費者;
随着時間的推移,非活躍的Session将被替換;
大請求(從增量中收益更多)被優先處理;
緩存抖動是有限的,避免了昂貴的Session重建時。
新增了如下錯誤類型:
FetchSessionIdNotFound:當用戶端請求引用伺服器不知道的Fetch Session時,伺服器将使用此錯誤代碼進行響應。如果存在用戶端錯誤,或者伺服器退出了Fetch Session,也會出現這種錯誤;
InvalidFetchSessionEpochException:當請求的Fetch Session Epoch與預期不相同時,伺服器将使用此錯誤代碼來進行響應。
請求SessionID
請求SessionEpoch
含義
-1
全量拉取(沒有使用或者建立Session時)
全量拉取(如果是新的Session,Epoch從1開始)
$ID
關閉表示為$ID的增量Fetch Session,并建立一個新的全量Fetch(如果是新的Session,Epoch從1開始)
$EPOCH
如果ID和EPOCH是正确的,建立一個增量Fetch
Request SessionID
沒有Fetch Session是建立新的
下一個請求會使增量Fetch請求,并且SessionID是$ID
Client和Broker的Fetch過程可以總結如下圖所示:

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行讨論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,部落客出書了《Kafka并不難學》和《Hadoop大資料挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買部落客的書進行學習,在此感謝大家的支援。關注下面公衆号,根據提示,可免費擷取書籍的教學視訊。
<b></b><b></b><b></b><b></b>
聯系方式:
Twitter:https://twitter.com/smartloli
QQ群(Hadoop - 交流社群1):424769183
QQ群(Kafka并不難學): 825943084
溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),友善管理者稽核,謝謝!