天天看點

Kafka Fetch Session剖析

最近有同學留言在使用Kafka的過程中遇到一些問題,比如在拉取的Topic中的資料時會抛出一些異常,今天筆者就為大家來分享一下Kafka的Fetch流程。

首先,我們來了解一下,Fetch Session的目标。Kafka在1.1.0以後的版本中優化了Fetch問題,引入了Fetch Session,Kafka由Broker來提供服務(通信、資料互動等)。每個分區會有一個Leader Broker,Broker會定期向Leader Broker發送Fetch請求,來擷取資料,而對于分區數較大的Topic來說,需要發出的Fetch請求就會很大。這樣會有一個問題:

Follower感興趣的分區集很少改變,然而每個FetchRequest必須枚舉Follower感興趣的所有分區集合;

當上一個FetchRequest隻會分區中沒有任何改變,仍然必須發回關于該分區的所有中繼資料,其中包括分區ID、分區的起始Offset、以及能夠請求的最大位元組數等。

并且,這些問題與系統中現存分區的數量成線性比例,例如,假設Kafka叢集中有100000個分區,其中大多數分區很少接收新消息。該系統中的Broker仍然會來回發送非常大的FetchRequest和FetchResponse,即使每秒添加的實際消息資料很少。随着分區數量的增長,Kafka使用越來越多的網絡帶寬來回傳遞這些消息。

當Kafka被調整為較低延遲時,這些低效會變得更嚴重。如果我們将每秒發送的FetchRequest數量增加一倍,我們應該期望在縮短的輪詢間隔内有更多的分區沒有改變。而且,我們無法在每個FetchRequest和FetchResponse中分攤每個分區發送中繼資料的所需要的帶寬資源,這将會導緻Kafka需要使用更多的網絡帶寬。

為了優化上述問題,Kafka增加了增量拉取分區的概念,進而減少用戶端每次拉取都需要拉取全部分區的問題。Fetch Session與網絡程式設計中的Session類似,可以認為它是有狀态的,這裡的狀态值的是知道它需要拉取哪些分區的資料,比如第一次拉取的分區0中的資料,後續分區0中沒有了資料,就不需要拉取分區0了,FetchSession資料結構如下

需要注意的是,Fetch Epoch是一個單調遞增的32位計數器,它在處理請求N之後,Broker會接收請求N+1,序列号總是大于0,在達到最大值後,它會回到1。

如果Fetch Session支援增量Fetch,那麼它将維護增量Fetch中每個分區的資訊,關于每個分區,它需要維護:

Topic名稱

分區ID

該分區的最大位元組數

Fetch偏移量

HighWaterMark

FetcherLogStartOffset

LeaderLogStartOffset

其中,Topic名稱、分區ID來自于TopicPartition,最大位元組數、Fetch偏移量、FetcherLogStartOffset來自于最近的FetcherRequest,HighWaterMark、LocalLogStartOffset來自于本地的Leader。因為Follower活着Consumer發出的請求都會與分區Leader進行互動,是以FetchSession也是記錄在Leader節點上的。

對于用戶端來說,什麼時候一個分區會被包含到增量的拉取請求中:

Client通知Broker,分區的maxBytes,fetchOffset,LogStartOffset改變了;

分區在之前的增量拉取會話中不存在,Client想要增加這個分區,進而來拉取新的分區;

分區在增量拉取會話中,Client要删除。

對于服務端來說,增量分區包含到增量的拉取響應中:

Broker通知Client分區的HighWaterMark或者brokerLogStartOffset改變了;

分區有新的資料

在Fetch.java類中,方法sendFetches(): prepareFetchRequests建立FetchSessionHandler.FetchRequestData。 建構拉取請求通過FetchSessionHandler.Builder,builder.add(partition, PartitionData)會添加next: 即要拉取的分區。建構時調用Builder.build(),針對Full進行拉取,代碼片段如下:

FetchSessionHandler.java

收到響應結果後,調用FetchSessionHandler.handleResponse()方法。 假如第一次是全量拉取,響應結果沒有出錯時,nextMetadata.isFull()仍然為true。 服務端建立了一個新的session(随機的唯一ID),用戶端的Fetch SessionId會設定為服務端傳回的sessionId, 并且epoch會增加1。這樣下次用戶端的拉取就不再是全量,而是增量了(toSend, toForget兩個集合容器,分别表示要拉取的和不需要拉取的)。 當服務端正常處理(這次不會生成新的session),用戶端也正常處理響應,則sessionId不會增加,但是epoch會增加1。

Broker處理拉取請求是,會建立不同類型的FetchContext,類型如下:

SessionErrorContext:拉取會話錯誤(例如,epoch不相等)

SessionlessFetchContext:不需要拉取會話

IncrementalFetchContext:增量拉取

FullFetchContext:全量拉取

在KafkaApis#handleFetchRequest()中,代碼片段如下:

因為Fetch Session使用的是Leader上的記憶體,是以我們需要限制在任何給定時間内的記憶體量,是以,每個Broker将隻建立有限數量的增量Fetch Session。以下,有兩個公共參數,用來配置Fetch Session的緩存:

max.incremental.fetch.session.cache.slots:用來限制每台Broker上最大Fetch Session數量,預設1000

min.incremental.fetch.session.eviction.ms:從緩存中逐漸增量擷取會話之前等待的最短時間,預設120000

這裡需要注意的時候,該屬性屬于read-only。Kafka Broker配置中有三種類型,它們分别是:

類型

說明

read-only

修改參數值後,需要重新開機Broker才能生效

per-broker

修改參數值後,隻會在對應的Broker上生效,不需要重新開機,屬于動态參數

cluster-wide

修改參數值後,整個叢集範圍内會生效,不需要重新開機,屬于動态參數

當伺服器收到建立增量Fetch Session請求時,它會将新的Session與先有的Session進行比較,隻有在下列情況下,新Session才會有效:

新Session在Follower裡面;

現有Session已停止,且超過最小等待時間;

現有Session已停止,且超過最小等待時間,并且新Session有更多的分區。

這樣可以實作如下目标:

Follower優先級高于消費者;

随着時間的推移,非活躍的Session将被替換;

大請求(從增量中收益更多)被優先處理;

緩存抖動是有限的,避免了昂貴的Session重建時。

新增了如下錯誤類型:

FetchSessionIdNotFound:當用戶端請求引用伺服器不知道的Fetch Session時,伺服器将使用此錯誤代碼進行響應。如果存在用戶端錯誤,或者伺服器退出了Fetch Session,也會出現這種錯誤;

InvalidFetchSessionEpochException:當請求的Fetch Session Epoch與預期不相同時,伺服器将使用此錯誤代碼來進行響應。

請求SessionID

請求SessionEpoch

含義

-1

全量拉取(沒有使用或者建立Session時)

全量拉取(如果是新的Session,Epoch從1開始)

$ID

關閉表示為$ID的增量Fetch Session,并建立一個新的全量Fetch(如果是新的Session,Epoch從1開始)

$EPOCH

如果ID和EPOCH是正确的,建立一個增量Fetch

Request SessionID

沒有Fetch Session是建立新的

下一個請求會使增量Fetch請求,并且SessionID是$ID

Client和Broker的Fetch過程可以總結如下圖所示:

Kafka Fetch Session剖析

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行讨論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,部落客出書了《Kafka并不難學》和《Hadoop大資料挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買部落客的書進行學習,在此感謝大家的支援。關注下面公衆号,根據提示,可免費擷取書籍的教學視訊。

<b></b><b></b><b></b><b></b>

聯系方式:

郵箱:[email protected]

Twitter:https://twitter.com/smartloli

QQ群(Hadoop - 交流社群1):424769183

QQ群(Kafka并不難學): 825943084

溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),友善管理者稽核,謝謝!

Kafka Fetch Session剖析