天天看點

Kafka成長記6:Producer如何将消息放入到記憶體緩沖區(上)

Kafka成長記6:Producer如何将消息放入到記憶體緩沖區(上)

之前我們分析了Producer的配置解析、元件分析、拉取中繼資料、消息的初步序列化方式、消息的路由政策。如下圖:

Kafka成長記6:Producer如何将消息放入到記憶體緩沖區(上)

這一節我們繼續分析發送消息的記憶體緩沖器原理—RecordAccumulator.append()。

在doSend中的,拉取中繼資料、消息的初步序列化方式、消息的路由政策之後就是accumulator.append()。

如下代碼所示:(去除了多餘的日志和異常處理,截取了核心代碼)

accumulator.append() 它主要是将路由結果、初步序列化的消息放入到消息記憶體緩沖器中。

分析如何将消息放入記憶體緩沖器之前,需要回顧下它内部的基本結構。之前元件分析的時候,我們初步分析過RecordAccumulator的大體結構,如下圖:

Kafka成長記6:Producer如何将消息放入到記憶體緩沖區(上)

1)設定了一些參數 batchSize、totalSize、retryBackoffMs、lingerMs、compression等

2)初始化了一些資料結構,比如batches是一個 new CopyOnWriteMap<>()

3)初始化了BufferPool和IncompleteRecordBatches

回顧了RecordAccumulator這個元件之後,我們就來看看到底如何将消息放入記憶體緩沖器的資料結構中的。

整個方法的脈絡,看着邏輯比較多,涉及了很多資料結構,我們一步一步來分析下。第一次看的話,大體你可以梳理如下脈絡:

1)getOrCreateDeque 這個方法應該是才建立一個雙端隊列,隊列放的每一個元素不是單條消息Record,而是消息的集合RecordBatch。

2)free.allocate 應該是在配置設定記憶體緩沖器中的記憶體

3)tryAppend 應該是将消息放入記憶體中

Kafka成長記6:Producer如何将消息放入到記憶體緩沖區(上)

在将消息放入記憶體緩沖器之前,首先通過getOrCreateDeque 建立的是一個存放消息集合的隊列。代碼如下:

這個建立的記憶體結構可以看到,是一個變量 batches,它是一個CopyOnWriteMap。這個資料結構之前我們元件圖初步分析過。再結合這段代碼,不難了解它的脈絡:

這個map主要根據Topic分區資訊作為key,value是一個隊列核心資料結構是RecordBatch,由于是第一次給某個topic分區發送的消息,value為空,需要初始化隊列,否則說明曾經給這個topic的分區發送給資料,value非空,直接傳回之前的隊列。

由于我們這裡是第一次向test-topic發送消息,是以可以得到下圖的資料結構:

Kafka成長記6:Producer如何将消息放入到記憶體緩沖區(上)

之後執行了一段加鎖邏輯,之前提到,tryAppend應該是将消息放入記憶體中。但是由于隊列是剛建立的,deque.peekLast();肯定是空,是以這段加鎖的代碼不會執行。

但是到這裡你會發現代碼一個明顯的特點,使用了synchronized加鎖和線程安全的記憶體結構CopyOnWriteMap,這些都是明顯線程安全的控制。

為什麼呢?因為同一個Producer可以使用多線程進行發送消息,必然要考慮線程安全的很多東西。

為什麼選用CopyOnWriteMap,而不用ConcurrentHashMap呢?你可以思考下。(這裡給個提示,JDK成長記提到過,CopyOnWriteMap它的底層是寫時複制,适合讀多寫少的場景)

synchronized加鎖代碼塊使用了,分段加鎖,并沒有暴力的在方法上加synchronized。這也是一個使用亮點。

到這裡,你會發現在中間件會大量的見到并發包下的元件的使用,工作中你用到可能都是鳳毛麟角,這些元件的使用是我們研究中間件源碼值得學習的一點。

你一定要多思考為什麼,不要停留在是什麼,怎麼用上,這個思想需要刻意訓練,希望你可以慢慢養成。

好了,今天的内容就到這裡,之前有同學回報,每一節的隻是太過于幹了,實實在在的幹貨!看起來有時候會比較費勁,是以之後的章節盡量會避免上萬字的大章節,會控制在6000字左右。

另外,除了成長記外,我偶爾也會分享我自己的故事和行業中遇見的事情,希望大家從我的經曆中可以有另一番成長和收獲,比如我是如何學習和提升技術的?我是如何畫圖的?我如何做技術分享的等等。

繼續閱讀