天天看點

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

上一節我們分析到如何将消息放入記憶體緩沖器主要分三步,如下圖所示:

我們重點分析了getOrCreateDeque()方法,它主要建立了如下資料結構,如下所示:

這一節我們繼續向下分析,看看如何通過BufferPool申請記憶體空間NIO的多塊記憶體ByteBuffer的。

記憶體緩沖區,配置設定記憶體的邏輯代碼主要如下所示:

可以看到這個邏輯非常簡單,隻是計算了一個空間大小,之後根據free.allocate()建立記憶體空間ByteBuffer。

熟悉NIO的同學,一定知道ByteBuffer這個元件,是NIO核心3大元件之一。它是一塊記憶體,這裡通過一個記憶體池來維護多塊ByteBuffer。這樣的好處就是避免建立的記憶體空間,頻繁的被GC,而且可以達到很好的重用性。這一點是不錯的思考。而且由于 Kafka底層使用NIO進行通信,使用ByteBuffer存放的資料,可以更好、更簡單的被發送出去。

好了回到正題,這個ByteBuffer可以明顯的看到是被BufferPool的allocate方法建立的。但是在研究allocate方法之前,我們先來看看ByteBuffer是如何建立的。

在之前第二節元件分析時,初步看過BufferPool這個類的結構,可以看到之前初始化RecordAccumulator時候,建立的BufferPool。它的基本核心是一個ReentrantLock和Deque free隊列。如下圖所示:

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

有了之前初步的了解,現在我們再仔細看下它的建立細節:

這個構造函數主要脈絡如下:

1)根據入參,設定核心的參數。主要有兩個,long memory, int poolableSize,其餘的入參都是時間或者統計相關的,可以先忽略。你可以向上查找構造函數傳遞入參的入口,最終會找到ConfigDef中預設初始化的值。如下:

memory預設對應的配置buffer.memory=33554432 ,也就是總緩沖區的大小,預設是32MB。poolableSize對應的配置batch.size=16384, 預設是16KB,也就是說消息可以打包的batch預設一批是16KB。這裡要注意如果消息比較大,這個兩個參數需要适當調整。

2)初始化核心記憶體結構和一把鎖。new ArrayDeque()、new ArrayDeque()、new ReentrantLock()。(Condition和ReentrantLock都是JDK并發包下的常用類。不熟悉的同學可以回顧下JDK成長記)

構造函數的邏輯整體如下圖所示:

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

你可以連蒙帶猜下,free這個隊列,應該是存放記憶體塊ByteBuffer的。由于是ArrayDeque,是以需要ReentrantLock進行并發控制。waiters的Condition隊列暫時不知道是做什麼的,可能是線程排隊等待擷取記憶體塊使用的。

建立好了BufferPool,它是如何通過allocate()申請記憶體的呢?

首先申請記憶體前需要明确申請記憶體的大小size,如下:

size的計算涉及到了幾個值取Max的邏輯。

batchSize就是之前BufferPool使用的參數,預設是16KB。

LOG_OVERHEAD+消息大小:12+keyBytes.size()+valueBytes.size();

簡單的說意思就是,如果消息的大小大于預設的batchSize,申請的記憶體以消息大小為主,否則就是預設batchSize的大小16KB。

PS:batchSize一般根據我們發送的消息肯定會調整的,如果你消息大于16KB,之後打包發送的時候是基于batchSize大小的ByteBuffer記憶體塊的,結果由于你的消息大小超過預設batchSize,每次打包發送其實就是一條消息,這樣每一條消息一次網絡傳輸,批量打包發送的意義就不大了。

上面的邏輯如下圖所示:

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

确認了申請記憶體空間的大小後,就會執行如下代碼申請記憶體了:

這個方法比較長,但是邏輯比較清晰,整體分為一個大的if-else 主要脈絡如下:

1)最外層的if主要邏輯是:如果free隊列存在空閑記憶體,直接使用,否則建立一塊大小為size的ByteBuffer,可用記憶體會扣減相應值

2)else主要邏輯是:如果總緩沖區的記憶體32MB都使用完了,線程需要通過Condition隊列進行排隊等待,擷取ByteBuffer

整體如下圖所示:

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

我們分别來看下細節,首先是第一段邏輯:

這塊邏輯很簡單。擷取ByteBuffer的方式不是從free隊列就是新建立。

但是這裡有一個問題,free隊列什麼時候有值的?

其實可以猜到,當從緩沖區發送出去消息後,會清空ByteBuffer,之後就會空閑這塊記憶體,自然也就會加入free這個隊列中了。你可以搜尋下這個free隊列的引用自己大體看下。之後分析如何發送緩沖器中的消息時會帶大家看到的。

剩下的第二段邏輯是總記憶體不夠用的時候線程排隊等待,之後喚醒的邏輯。這塊邏輯考慮很多特殊邏輯,看上去比較複雜。

但是當你梳理清楚後,發現其實本質就是Condition的await和signal而已。而且這裡有一個最大的等待逾時時間,逾時後會抛出異常。具體就不一步一步帶大家分析了,我們肯定是盡量避免這種情況的。大體邏輯總結如下圖:

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

Condition這個waiter隊列如何被喚醒的呢?其實和free記憶體增加是一樣的,當發送消息之後,記憶體使用完成,有可用記憶體之後,自然會被喚醒,之後分析如何發送緩沖器中的消息時會帶大家看到的。如下所示:

Kafka成長記7:Producer如何将消息放入到記憶體緩沖區(中)

好了, 到這裡,記憶體緩沖器RecordAccumulator通過BufferPool申請記憶體的源碼原理基本就分析完了。你主要知道了:

BufferPool的建立多塊記憶體ByteBuffer的原因

兩個核心的參數batchSize=16kb,bufferMemory=32MB

核心資料結構Deque waiters和Dequefree。

每一塊ByteBuffer的大小計算邏輯

如何申請和重用記憶體ByteBuffer的邏輯

下一節我們繼續來分析發送消息的記憶體緩沖器原理—tryAppend的邏輯。之後如何打包消息,并将打包好的消息發送出去的。消息的最終序列化格式和NIO的拆包粘包問題。大家敬請期待!