本文源碼:GitHub·點這裡 || GitEE·點這裡
注意:這裡zookeeper3.4也是基于叢集模式部署。
建立日志目錄
注意:以上操作需要同步到叢集下其他服務上。
注意:broker.id安裝叢集服務個數編排即可,叢集下不能重複。
注意:這裡預設啟動了zookeeper叢集服務,并且叢集下的kafka分别啟動。
建立topic
參數說明:
replication-factor 定義副本個數
partitions 定義分區個數
topic:定義topic名稱
檢視topic清單
修改topic分區
檢視topic
發送消息
消費消息
删除topic
Kafka叢集中有一個broker會被選舉為Controller,Controller依賴Zookeeper環境,管理叢集broker的上下線,所有topic的分區副本配置設定和leader選舉等工作。
Kafka中間件的Producer攔截器主要用于實作消息發送的自定義控制邏輯。使用者可以在消息發送前以及回調邏輯執行前有機會對消息做一些自定義,比如消息修改等,發送狀态監控等,使用者可以指定多個攔截器按順序執行攔截。
核心方法
configure:擷取配置資訊和初始化資料時調用;
onSend:消息被序列化以及和計算分區前調用該方法,可以對消息做操作;
onAcknowledgement:消息發送到Broker之後,或發送過程失敗時調用;
close:關閉攔截器調用,執行一些資源清理工作;
注意:這裡說的攔截器是針對消息發送流程。
定義方式:實作ProducerInterceptor接口即可。
攔截器一:在onSend方法中,對攔截的消息進行修改。
攔截器二:在onAcknowledgement方法中,判斷消息是否發送成功。
加載攔截器:基于一個KafkaProducer配置Bean,加入攔截器。
基于上述自定義Bean類型,進行消息發送,關注攔截器中列印日志資訊。
說明:該過程基于上述案例producer.send方法追蹤的源碼執行流程,源碼中的過程相對清楚,涉及的核心流程如下。

Producer發送消息采用的是異步發送的方式,消息發送過程如下:
Producer發送消息之後,經過攔截器,序列化,事務判斷;
流程執行後,消息内容放入容器中;
容器在指定時間内如果裝滿(size),會喚醒Sender線程;
容器如果在指定時間内沒有裝滿,也會執行一次Sender線程喚醒;
喚醒Sender線程之後,把容器資料拉取到topic中;
絮叨一句:讀這些中間件的源碼,不僅能開闊思維,也會讓自己意識到平時寫的代碼可能真的叫搬磚。
Kafka中消息是以topic進行辨別分類,生産者面向topic生産消息,topic分區(partition)是實體上的存儲,基于消息日志檔案的方式。
每個partition對應于一個log檔案,發送的消息不斷追加到該log檔案末端;
log檔案中存儲的就是producer生産的消息資料,采用分片和索引機制;
partition分為多個segment。每個segment對應兩個(.index)和(.log)檔案;
index檔案類型存儲的索引資訊;
log檔案存儲消息的資料;
索引檔案中的中繼資料指向對應資料檔案中message的實體偏移位址;
消費者組中的每個消費者,都會實時記錄消費的消息offset位置;
當然消息消費出錯時,恢複是從上次的記錄位置繼續消費;
Kafka支援消息的事務控制
Producer事務
跨分區跨會話的事務原理,引入全局唯一的TransactionID,并将Producer獲得的PID和TransactionID綁定。Producer重新開機後可以通過正在進行的TransactionID獲得原來的PID。
Kafka基于TransactionCoordinator元件管理Transaction,Producer通過和TransactionCoordinator互動獲得TransactionID對應的任務狀态。TransactionCoordinator将事務狀态寫入Kafka的内部Topic,即使整個服務重新開機,進行中的事務狀态可以得到恢複。
Consumer事務
Consumer消息消費,事務的保證強度很低,無法保證消息被精确消費,因為同一事務的消息可能會出現重新開機後已經被删除的情況。
推薦關聯閱讀:資料源管理系列
序号
标題
01
資料源管理:主從庫動态路由,AOP模式讀寫分離
02
資料源管理:基于JDBC模式,适配和管理動态資料源
03
資料源管理:動态權限校驗,表結構和資料遷移流程
04
資料源管理:關系型分庫分表,列式庫分布式計算
05
資料源管理:PostGreSQL環境整合,JSON類型應用
06
資料源管理:基于DataX元件,同步資料和源碼分析
07
資料源管理:OLAP查詢引擎,ClickHouse叢集化管理