天天看點

Kafka Zero-Copy 使用分析

kafka 我個人感覺是性能優化的典範。而且使用scala開發,代碼寫的也很漂亮的。重點我覺得有四個:

nio

zero copy

磁盤順序讀寫

queue資料結構的極緻使用

zero-copy 實際的原理,大家還是去google下。這篇文章重點會分析這項技術是怎麼被嵌入到kafa裡的。包含兩部分:

kafka在什麼場景下用了這個技術

zero-copy 是如何被調用,并且發揮作用的。

答案是:

<b>消息消費的時候 </b>

包括外部consumer以及follower 從partiton leader同步資料,都是如此。簡單描述就是:

consumer從broker擷取檔案資料的時候,直接通過下面的方法進行channel到channel的資料傳輸。

也就是說你的資料源是一個channel,資料接收端也是一個channel(socketchannel),則通過該方式進行資料傳輸,是直接在核心态進行的,避免拷貝資料導緻的核心态和使用者态的多次切換。

估計看完這段内容,你對整個kafka的資料處理流程也差不多了解了個大概。為了避免過于繁雜,以至于将整個kafka的體系都拖進來,我們起始點從kafkaapis相關的類開始。

對應的類名稱為:

該類是負責真正的kafka業務邏輯處理的。在此之前的,譬如 socketserver等類似tomcat伺服器一樣,側重于互動,屬于架構層次的東西。kafkaapis 則類似于部署在tomcat裡的應用。

handle 方法是所有處理的入口,然後根據請求的不同,有不同的處理邏輯。這裡我們關注apikeys.fetch這塊,也就是有消費者要擷取資料的邏輯。進入 handlefetchrequest方法,你會看到最後一行代碼如下:

replicamanager 包含所有主題的所有partition消息。大部分針對partition的操作都是通過該類來完成的。

<code>replicamanager.fetchmessages</code> 這個方法非常的長。我們隻關注一句代碼:

該方法擷取本地日志資訊資料。内部會調用kafka.cluster.log對象的read方法:

log 對象是啥呢?其實就是對應的一個topic的partition. 一個partition是由很多端(segment)組成的,這和lucene非常相似。一個segment就是一個檔案。實際的資料自然是從這裡讀到的。代碼如下:

這裡的fetchinfo(fetchdatainfo)對象包含兩個字段:

filemessageset 其實就是使用者在這個partition這一次消費能夠拿到的資料集合。當然,真實的資料還躺在bytebuffer裡,并沒有記在到記憶體中。filemessageset 裡面包含了一個很重要的方法:

這裡我們看到了久違的transferfrom方法。那麼這個方法什麼時候被調用呢?我們先擱置下,因為那個是另外一個流程。我們繼續分析上面的代碼。也就是接着從這段代碼開始分析:

擷取到這個資訊後,會執行如下操作:

logreadresults 的資訊被包裝成fetchresponsepartitiondata, fetchresponsepartitiondata 包喊了我們的filemessageset 對象。還記得麼,這個對象包含了我們要跟蹤的tranferto方法。然後fetchresponsepartitiondata 會給responsecallback作為參數進行回調。

responsecallback 的函數簽名如下(我去掉了一些我們不關心的資訊):

我們重點關注這個回調方法裡的fetchresponsecallback。 我們會發現這裡 fetchresponsepartitiondata 會被封裝成一個fetchresponsesend ,然後由requestchannel發送出去。

因為kafka完全應用是nio的異步機制,是以到這裡,我們無法再跟進去了,需要從另外一部分開始分析。

前面隻是涉及到資料的擷取。讀取日志,并且獲得對應messageset對象。messageset 是一段資料的集合,但是該資料沒有真實的被加載。這裡會涉及到kafka 如何将資料發送回consumer端。

在socketserver,也就是負責和所有的消費者打交道,建立連接配接的中樞裡,會不斷的進行poll操作

首先會注冊新的連接配接,如果有的話。接着就是處理新的響應了。還記得剛剛上面我們通過requestchannel把fetchresponsesend發出來吧。

這裡類似的,processnewresponses方法會先通過send方法把fetchresponsesend注冊到selector上。 這個操作其實做的事情如下:

為了友善看代碼,我對代碼做了改寫。我們看到,其實send就是做了一個write時間注冊。這個是和nio機制相關的。如果大家看的有障礙,不妨先學習下相關的機制。

回到 socketserver 的run方法裡,也就是上面已經貼過的代碼:

socketserver 會poll隊列,一旦對應的kafkachannel 寫操作ready了,就會調用kafkachannel的write方法:

依然的,為了減少代碼,我做了些調整,其中write會調用 send方法,對應的send對象其實就是上面我們注冊的fetchresponsesend 對象。

這段代碼裡真實發送資料的代碼是send.writeto(transportlayer);,

對應的writeto方法為:

這裡我依然做了代碼簡化,隻讓我們關注核心的。 這裡最後是調用了sends的writeto方法,而sends 其實是個multisend。這個multisend 裡有兩個東西:

topicandpartition.partition: 分區

message:fetchresponsepartitiondata  

還記得這個fetchresponsepartitiondata  麼?我們的messageset 就被放在了fetchresponsepartitiondata這個對象裡。

topicdatasend 也包含了sends,該sends 包含了 partitiondatasend,而 partitiondatasend則包含了fetchresponsepartitiondata。

最後進行writeto的時候,其實是調用了

如果你還記得的話,filemessageset 也有個writeto方法,就是我們之前已經提到過的那段代碼:

終于走到最底層了,最後其實是通過tl.transferfrom(channel, position, count) 來完成最後的資料發送的。這裡你可能比較好奇,不應該是調用transferto 方法麼? transferfrom其實是kafka自己封裝的一個方法,最終裡面調用的也是transerto:

kafka的整個調用棧還是非常繞的。尤其是引入了nio的事件機制,有點類似shuffle,把流程調用給切斷了,無法簡單通過代碼引用來進行跟蹤。kafka還有一個非常優秀的機制就是delayqueue機制,我們在分析的過程中,為了友善,把這塊完全給抹掉了。