![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SO4cDMxUzY2gDMxUmYhJjNxYzXxITMwcTMwEzLcBTMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL5M3Lc9CX6MHc0RHaiojIsJye.png)
上一節我們分析到如何将消息放入記憶體緩沖器主要分三步,如下圖所示:
我們重點分析了getOrCreateDeque()方法,它主要建立了如下資料結構,如下所示:
這一節我們繼續向下分析,看看如何通過BufferPool申請記憶體空間NIO的多塊記憶體ByteBuffer的。
記憶體緩沖區,配置設定記憶體的邏輯代碼主要如下所示:
可以看到這個邏輯非常簡單,隻是計算了一個空間大小,之後根據free.allocate()建立記憶體空間ByteBuffer。
熟悉NIO的同學,一定知道ByteBuffer這個元件,是NIO核心3大元件之一。它是一塊記憶體,這裡通過一個記憶體池來維護多塊ByteBuffer。這樣的好處就是避免建立的記憶體空間,頻繁的被GC,而且可以達到很好的重用性。這一點是不錯的思考。而且由于 Kafka底層使用NIO進行通信,使用ByteBuffer存放的資料,可以更好、更簡單的被發送出去。
好了回到正題,這個ByteBuffer可以明顯的看到是被BufferPool的allocate方法建立的。但是在研究allocate方法之前,我們先來看看ByteBuffer是如何建立的。
在之前第二節元件分析時,初步看過BufferPool這個類的結構,可以看到之前初始化RecordAccumulator時候,建立的BufferPool。它的基本核心是一個ReentrantLock和Deque free隊列。如下圖所示:
有了之前初步的了解,現在我們再仔細看下它的建立細節:
這個構造函數主要脈絡如下:
1)根據入參,設定核心的參數。主要有兩個,long memory, int poolableSize,其餘的入參都是時間或者統計相關的,可以先忽略。你可以向上查找構造函數傳遞入參的入口,最終會找到ConfigDef中預設初始化的值。如下:
memory預設對應的配置buffer.memory=33554432 ,也就是總緩沖區的大小,預設是32MB。poolableSize對應的配置batch.size=16384, 預設是16KB,也就是說消息可以打包的batch預設一批是16KB。這裡要注意如果消息比較大,這個兩個參數需要适當調整。
2)初始化核心記憶體結構和一把鎖。new ArrayDeque()、new ArrayDeque()、new ReentrantLock()。(Condition和ReentrantLock都是JDK并發包下的常用類。不熟悉的同學可以回顧下JDK成長記)
構造函數的邏輯整體如下圖所示:
你可以連蒙帶猜下,free這個隊列,應該是存放記憶體塊ByteBuffer的。由于是ArrayDeque,是以需要ReentrantLock進行并發控制。waiters的Condition隊列暫時不知道是做什麼的,可能是線程排隊等待擷取記憶體塊使用的。
建立好了BufferPool,它是如何通過allocate()申請記憶體的呢?
首先申請記憶體前需要明确申請記憶體的大小size,如下:
size的計算涉及到了幾個值取Max的邏輯。
batchSize就是之前BufferPool使用的參數,預設是16KB。
LOG_OVERHEAD+消息大小:12+keyBytes.size()+valueBytes.size();
簡單的說意思就是,如果消息的大小大于預設的batchSize,申請的記憶體以消息大小為主,否則就是預設batchSize的大小16KB。
PS:batchSize一般根據我們發送的消息肯定會調整的,如果你消息大于16KB,之後打包發送的時候是基于batchSize大小的ByteBuffer記憶體塊的,結果由于你的消息大小超過預設batchSize,每次打包發送其實就是一條消息,這樣每一條消息一次網絡傳輸,批量打包發送的意義就不大了。
上面的邏輯如下圖所示:
确認了申請記憶體空間的大小後,就會執行如下代碼申請記憶體了:
這個方法比較長,但是邏輯比較清晰,整體分為一個大的if-else 主要脈絡如下:
1)最外層的if主要邏輯是:如果free隊列存在空閑記憶體,直接使用,否則建立一塊大小為size的ByteBuffer,可用記憶體會扣減相應值
2)else主要邏輯是:如果總緩沖區的記憶體32MB都使用完了,線程需要通過Condition隊列進行排隊等待,擷取ByteBuffer
整體如下圖所示:
我們分别來看下細節,首先是第一段邏輯:
這塊邏輯很簡單。擷取ByteBuffer的方式不是從free隊列就是新建立。
但是這裡有一個問題,free隊列什麼時候有值的?
其實可以猜到,當從緩沖區發送出去消息後,會清空ByteBuffer,之後就會空閑這塊記憶體,自然也就會加入free這個隊列中了。你可以搜尋下這個free隊列的引用自己大體看下。之後分析如何發送緩沖器中的消息時會帶大家看到的。
剩下的第二段邏輯是總記憶體不夠用的時候線程排隊等待,之後喚醒的邏輯。這塊邏輯考慮很多特殊邏輯,看上去比較複雜。
但是當你梳理清楚後,發現其實本質就是Condition的await和signal而已。而且這裡有一個最大的等待逾時時間,逾時後會抛出異常。具體就不一步一步帶大家分析了,我們肯定是盡量避免這種情況的。大體邏輯總結如下圖:
Condition這個waiter隊列如何被喚醒的呢?其實和free記憶體增加是一樣的,當發送消息之後,記憶體使用完成,有可用記憶體之後,自然會被喚醒,之後分析如何發送緩沖器中的消息時會帶大家看到的。如下所示:
好了, 到這裡,記憶體緩沖器RecordAccumulator通過BufferPool申請記憶體的源碼原理基本就分析完了。你主要知道了:
BufferPool的建立多塊記憶體ByteBuffer的原因
兩個核心的參數batchSize=16kb,bufferMemory=32MB
核心資料結構Deque waiters和Dequefree。
每一塊ByteBuffer的大小計算邏輯
如何申請和重用記憶體ByteBuffer的邏輯
下一節我們繼續來分析發送消息的記憶體緩沖器原理—tryAppend的邏輯。之後如何打包消息,并将打包好的消息發送出去的。消息的最終序列化格式和NIO的拆包粘包問題。大家敬請期待!