Kafka 是一個高吞吐量的分布式的釋出訂閱消息系統,在全世界都很流行,在大資料項目裡面使用尤其頻繁。筆者看過多個大資料開源産品的源碼,感覺 Kafka 的源碼是其中品質比較上乘的一個,這得益于作者高超的編碼水準和高超的架構設計能力。
Kafka 的核心源碼分為兩部分:用戶端源碼和服務端源碼,用戶端又分為生産者和消費者,而個人認為 Kafka 的源碼裡面生産者的源碼技術含量最高,是以今天給大家剖析 Kafka 的生産者的架構設計,Kafka 是一個飛速發展的消息系統,其架構也在一直演進中,我們今天分析的 Kafka 的版本是比較成熟穩定的 Kafka1.0.0 版本源碼。

圖1 Kafka核心子產品
生産者流程概述
先給大家介紹一下生産者的大概的運作的流程。
圖2 Kafka運作方式
如上圖所示:步驟一:一條消息過來首先會被封裝成為一個 ProducerRecord 對象。
步驟二:接下來要對這個對象進行序列化,因為 Kafka 的消息需要從用戶端傳到服務端,涉及到網絡傳輸,是以需要實作序列。Kafka 提供了預設的序列化機制,也支援自定義序列化(這種設計也值得我們積累,提高項目的擴充性)。
步驟三:消息序列化完了以後,對消息要進行分區,分區的時候需要擷取叢集的中繼資料。分區的這個過程很關鍵,因為這個時候就決定了,我們的這條消息會被發送到 Kafka 服務端到哪個主題的哪個分區了。
步驟四:分好區的消息不是直接被發送到服務端,而是放入了生産者的一個緩存裡面。在這個緩存裡面,多條消息會被封裝成為一個批次(batch),預設一個批次的大小是 16K。
步驟五:Sender 線程啟動以後會從緩存裡面去擷取可以發送的批次。
步驟六:Sender 線程把一個一個批次發送到服務端。大家要注意這個設計,在 Kafka0.8 版本以前,Kafka 生産者的設計是來一條資料,就往服務端發送一條資料,頻繁的發生網絡請求,結果性能很差。後面的版本再次架構演進的時候把這兒改成了批處理的方式,性能指數級的提升,這個設計值得我們積累。
生産者細節深度剖析
接下來我們生産者這兒技術含量比較高的一個地方,前面概述那兒我們看到,一個消息被分區以後,消息就會被放到一個緩存裡面,我們看一下裡面具體的細節。預設緩存塊的大小是 32M,這個緩存塊裡面有一個重要的資料結構:batches,這個資料結構是 key-value 的結果,key 就是消息主題的分區,value 是一個隊列,裡面存的是發送到對應分區的批次,Sender 線程就是把這些批次發送到服務端。
圖3 生産者架構
01 / 生産者進階設計之自定義資料結構
生産者把批次資訊用 batches 這個對象進行存儲。如果是大家,大家會考慮用什麼資料結構去存儲批次資訊?
Kafka 這兒采取的方式是自定義了一個資料結構:CopyOnWriteMap。熟悉 Java 的同學都知道,JUC 下面是有一個 CopyOnWriteArrayList 的資料結構的,但是沒有 CopyOnWriteMap,我這兒給大家解釋一下 Kafka 為什麼要設計這樣的一個資料結構。
1.他們存儲的資訊的是 key-value 的結構,key 是分區,value 是要存到這個分區的對應批次(批次可能有多個,是以用的是隊列),故因為是 key-value 的資料結構,是以鎖定用 Map 資料結構。
2.這個 Kafka 生産者面臨的是一個高并發的場景,大量的消息會湧入這個這個資料結構,是以這個資料結構需要保證線程安全,這樣我們就不能使用 HashMap 這樣的資料結構了。
3.這個資料結構需要支援的是讀多寫少的場景。讀多是因為每條消息過來都會根據 key 讀取 value 的資訊,假如有 1000 萬條消息,那麼就會讀取 batches 對象 1000 萬次。寫少是因為,比如我們生産者發送資料需要往一個主題裡面去發送資料,假設這個主題有 50 個分區,那麼這個 batches 裡面就需要寫 50 個 key-value 資料就可以了(大家要搞清楚我們雖然要寫 1000 萬條資料,但是這 1000 萬條是寫入 queue 隊列的 batch 裡的,并不是直接寫入 batches,是以就我們剛剛說的這個場景,batches 裡隻需要最多寫 50 條資料就可以了)。
根據第二和第三個場景我們總結出來,Kafka 這兒需要一個能保證線程安全的,支援讀多寫少的 Map 資料結構。但是 Java 裡面并沒有提供出來的這樣的一個資料,唯一跟這個需求比較接近的是 CopyOnWriteArrayList,但是偏偏它又不是 Map 結構,是以 Kafka 這兒模仿 CopyOnWriteArrayList 設計了 CopyOnWriteMap。采用了讀寫分離的思想解決了線程安全且支援讀多寫少等問題。
高效的資料結構保證了生産者的性能。(CopyOnWriteArrayList 不熟悉的同學,可以嘗試百度學習)。這兒筆者建議大家可以去看看 Kafka 生産者往 batches 裡插入資料的源碼,生産者為了保證插入資料的高性能,采用了多線程,又為了線程安全,使用了分段加鎖等多種手段,源碼非常精彩。
02 / 生産者進階設計之記憶體池設計
剛剛我們看到 batches 裡面存儲的是批次,批次預設的大小是 16K,整個緩存的大小是 32M,生産者每封裝一個批次都需要去申請記憶體,正常情況下如果一個批次發送出去了以後,那麼這 16K 的記憶體就等着 GC 來回收了。但是如果是這樣的話,就可能會頻繁的引發 FullGC,故而影響生産者的性能,是以在緩存裡面設計了一個記憶體池(類似于我們平時用的資料庫的連接配接池),一個 16K 的記憶體用完了以後,把資料清空,放入到記憶體池裡,下個批次用的時候直接從裡面擷取就可以。這樣大大的減少了 GC 的頻率,保證了生産者的穩定和高效(Java 的 GC 問題是一個頭疼的問題,是以這種設計也非常值得我們去積累)。
結尾
Kafka 的設計之中精彩的地方有很多,今天我們截取了一部分跟大家分享。之前我看到過 Kafka 的源碼以後,就想以後如果我要去當老師,去培養架構師的話,那麼我一定得跟學生分享 Kafka 的源碼,通過學習 Kafka 源碼提升系統架構能力,再次建議大家有空可以研究研究 Kafka 的源碼,大家加油!!