引言
再探究 Kafka 核心知識之前,我們先思考一個問題:什麼場景會促使我們使用 Kafka? 說到這裡,我們頭腦中或多或少會蹦出異步解耦和削峰填谷等字樣,是的,這就是 Kafka 最重要的落地場景。
- 異步解耦:同步調用轉換成異步消息通知,實作生産者和消費者的解耦。想象一個場景,在商品交易時,在訂單建立完成之後,需要觸發一系列其他的操作,比如進行使用者訂單資料的統計、給使用者發送短信、給使用者發送郵件等等。如果所有操作都采用同步方式實作,将嚴重影響系統性能。針對此場景,我們可以利用消息中間件解決訂單建立操作和其他後續行為。
- 削峰填谷:利用 broker 緩沖上遊生産者瞬時突發的流量,使消費者消費流量整體平滑。對于發送能力很強的上遊系統,如果沒有消息中間件的保護,下遊系統可能會直接被壓垮導緻全鍊路服務雪崩。想象秒殺業務場景,上遊業務發起下單請求,下遊業務執行秒殺業務(庫存檢查,庫存當機,餘額當機,生成訂單等等),下遊業務處理的邏輯是相當複雜的,并發能力有限,如果上遊服務不做限流政策,瞬時可能把下遊服務壓垮。針對此場景,我們可以利用 MQ 來做削峰填谷,讓高峰流量填充低谷空閑資源,達到系統資源的合理利用。
通過上述例子可以發現交易、支付等場景常需要異步解耦和削峰填谷功能解決問題,而交易、支付等場景對性能、可靠性要求特别高。那麼,我們本文的主角 Kafka 能否滿足相應要求呢?下面我們來探讨下。
Kafka 宏觀認知
再探究 Kafka 的高性能、高可靠性之前,我們從宏觀上來看下 Kafka 的系統架構:
如上圖所示,Kafka 由 Producer、Broker、Consumer 以及負責叢集管理的 ZooKeeper 組成,各部分功能如下:
- Producer:生産者,負責消息的建立并通過一定的路由政策發送消息到合适的 Broker;
- Broker:服務執行個體,負責消息的持久化、中轉等功能;
- Consumer :消費者,負責從 Broker 中拉取(Pull)訂閱的消息并進行消費,通常多個消費者構成一個分組,消息隻能被同組中的一個消費者消費;
- ZooKeeper:負責 broker、consumer 叢集中繼資料的管理等;(注意:Producer 端直接連接配接 broker,不在 zk 上存任何資料,隻是通過 ZK 監聽 broker 和 topic 等資訊)
上圖消息流轉過程中,還有幾個特别重要的概念—主題(Topic)、分區(Partition)、分段(segment)、位移(offset)。
- topic:消息主題。Kafka 按 topic 對消息進行分類,我們在收發消息時隻需指定 topic。
- partition:分區。為了提升系統的吞吐,一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多台機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。另外,為了提升系統的可靠性,partition 通常會分組,且每組有一個主 partition、多個副本 partition,且分布在不同的 broker 上,進而起到容災的作用。
- segment:分段。宏觀上看,一個 partition 對應一個日志(Log)。由于生産者生産的消息會不斷追加到 log 檔案末尾,為防止 log 檔案過大導緻資料檢索效率低下,Kafka 采取了分段和索引機制,将每個 partition 分為多個 segment,同時也便于消息的維護和清理。每個 segment 包含一個.log 日志檔案、兩個索引(.index、timeindex)檔案以及其他可能的檔案。每個 Segment 的資料檔案以該段中最小的 offset 為檔案名,當查找 offset 的 Message 的時候,通過二分查找快找到 Message 所處于的 Segment 中。
- offset:消息在日志中的位置,消息在被追加到分區日志檔案的時候都會配置設定一個特定的偏移量。offset 是消息在分區中的唯一辨別,是一個單調遞增且不變的值。Kafka 通過它來保證消息在分區内的順序性,不過 offset 并不跨越分區,也就是說,Kafka 保證的是分區有序而不是主題有序。
Kafka 高可靠性、高性能探究
在對 Kafka 的整體系統架構及相關概念簡單了解後,下面我們來進一步深入探讨下高可靠性、高性能實作原理。
Kafka 高可靠性探究
Kafka 高可靠性的核心是保證消息在傳遞過程中不丢失,涉及如下核心環節:
- 消息從生産者可靠地發送至 Broker;-- 網絡、本地丢資料;
- 發送到 Broker 的消息可靠持久化;-- Pagecache 緩存落盤、單點崩潰、主從同步跨網絡;
- 消費者從 Broker 消費到消息且最好隻消費一次 -- 跨網絡消息傳輸 。
消息從生産者可靠地發送至 Broker
為了保障消息從生産者可靠地發送至 Broker,我們需要確定兩點;
- Producer 發送消息後,能夠收到來自 Broker 的消息儲存成功 ack;
- Producer 發送消息後,能夠捕獲逾時、失敗 ack 等異常 ack 并做處理。
ack 政策
針對問題 1,Kafka 為我們提供了三種 ack 政策,
- Request.required.acks = 0:請求發送即認為成功,不關心有沒有寫成功,常用于日志進行分析場景;
- Request.required.acks = 1:當 leader partition 寫入成功以後,才算寫入成功,有丢資料的可能;
- Request.required.acks= -1:ISR 清單裡面的所有副本都寫完以後,這條消息才算寫入成功,強可靠性保證;
為了實作強可靠的 kafka 系統,我們需要設定 Request.required.acks= -1,同時還會設定叢集中處于正常同步狀态的副本 follower 數量 min.insync.replicas>2,另外,設定 unclean.leader.election.enable=false 使得叢集中 ISR 的 follower 才可變成新的 leader,避免特殊情況下消息截斷的出現。
消息發送政策
針對問題 2,kafka 提供兩類消息發送方式:同步(sync)發送和異步(async)發送,相關參數如下:
以 sarama 實作為例,在消息發送的過程中,無論是同步發送還是異步發送都會涉及到兩個協程--負責消息發送的主協程和負責消息分發的 dispatcher 協程。
異步發送
對于異步發送(ack != 0 場景,等于 0 時不關心寫 kafka 結果,後文詳細講解)而言,其流程大概如下:
- 在主協程中調用異步發送 kafka 消息的時候,其本質是将消息體放進了一個 input 的 channel,隻要入 channel 成功,則這個函數直接傳回,不會産生任何阻塞。相反,如果入 channel 失敗,則會傳回錯誤資訊。是以調用 async 寫入的時候傳回的錯誤資訊是入 channel 的錯誤資訊,至于具體最終消息有沒有發送到 kafka 的 broker,我們無法從傳回值得知。
- 當消息進入 input 的 channel 後,會有另一個dispatcher 的協程負責周遊 input,來真正發送消息到特定 Broker 上的主 Partition 上。發送結果通過一個異步協程進行監聽,循環處理 err channel 和 success channel,出現了 error 就記一個日志。是以異步寫入場景時,寫 kafka 的錯誤資訊,我們暫時僅能夠從這個錯誤日志來得知具體發生了什麼錯,并且也不支援我們自建函數進行兜底處理,這一點在 trpc-go 的官方也得到了承認。
同步發送
同步發送(ack != 0 場景)是在異步發送的基礎上加以條件限制實作的。同步消息發送在 newSyncProducerFromAsyncProducer 中開啟兩個異步協程處理消息成功與失敗的“回調”,并使用 waitGroup 進行等待,進而将異步操作轉變為同步操作,其流程大概如下:
通過上述分析可以發現,kafka 消息發送本質上都是異步的,不過同步發送通過 waitGroup 将異步操作轉變為同步操作。同步發送在一定程度上確定了我們在跨網絡向 Broker 傳輸消息時,消息一定可以可靠地傳輸到 Broker。因為在同步發送場景我們可以明确感覺消息是否發送至 Broker,若因網絡抖動、機器當機等故障導緻消息發送失敗或結果不明,可通過重試等手段確定消息至少一次(at least once) 發送到 Broker。另外,Kafka(0.11.0.0 版本後)還為 Producer 提供兩種機制來實作精确一次(exactly once) 消息發送:幂等性(Idempotence)和事務(Transaction)。
小結
通過 ack 政策配置、同步發送、事務消息組合能力,我們可以實作exactly once 語意跨網絡向 Broker 傳輸消息。但是,Producer 收到 Broker 的成功 ack,消息一定不會丢失嗎?為了搞清這個問題,我們首先要搞明白 Broker 在接收到消息後做了哪些處理。
發送到 Broker 的消息可靠持久化
為了確定 Producer 收到 Broker 的成功 ack 後,消息一定不在 Broker 環節丢失,我們核心要關注以下幾點:
- Broker 傳回 Producer 成功 ack 時,消息是否已經落盤;
- Broker 當機是否會導緻資料丢失,容災機制是什麼;
- Replica 副本機制帶來的多副本間資料同步一緻性問題如何解決;
Broker 異步刷盤機制
kafka 為了獲得更高吞吐,Broker 接收到消息後隻是将資料寫入 PageCache 後便認為消息已寫入成功,而 PageCache 中的資料通過 linux 的 flusher 程式進行異步刷盤(刷盤觸發條:主動調用 sync 或 fsync 函數、可用記憶體低于閥值、dirty data 時間達到閥值),将資料順序寫到磁盤。消息處理示意圖如下:
由于消息是寫入到 pageCache,單機場景,如果還沒刷盤 Broker 就當機了,那麼 Producer 産生的這部分資料就可能丢失。為了解決單機故障可能帶來的資料丢失問題,Kafka 為分區引入了副本機制。
Replica 副本機制
Kafka 每組分區通常有多個副本,同組分區的不同副本分布在不同的 Broker 上,儲存相同的消息(可能有滞後)。副本之間是“一主多從”的關系,其中 leader 副本負責處理讀寫請求,follower 副本負責從 leader 拉取消息進行同步。分區的所有副本統稱為 AR(Assigned Replicas),其中所有與 leader 副本保持一定同步的副本(包括 leader 副本在内)組成 ISR(In-Sync Replicas),與 leader 同步滞後過多的副本組成 OSR(Out-of-Sync Replicas),由此可見,AR=ISR+OSR。
follower 副本是否與 leader 同步的判斷标準取決于 Broker 端參數 replica.lag.time.max.ms(預設為 10 秒),follower 預設每隔 500ms 向 leader fetch 一次資料,隻要一個 Follower 副本落後 Leader 副本的時間不連續超過 10 秒,那麼 Kafka 就認為該 Follower 副本與 leader 是同步的。在正常情況下,所有的 follower 副本都應該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合為空。
當 leader 副本所在 Broker 當機時,Kafka 會借助 ZK 從 follower 副本中選舉新的 leader 繼續對外提供服務,實作故障的自動轉移,保證服務可用。為了使選舉的新 leader 和舊 leader 資料盡可能一緻,當 leader 副本發生故障時,預設情況下隻有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒有任何機會(可通過設定 unclean.leader.election.enable 改變)。
當 Kafka 通過多副本機制解決單機故障問題時,同時也帶來了多副本間資料同步一緻性問題。Kafka 通過高水位更新機制、副本同步機制、 Leader Epoch 等多種措施解決了多副本間資料同步一緻性問題,下面我們來依次看下這幾大措施。
HW 和 LEO
首先,我們來看下兩個和 Kafka 中日志相關的重要概念 HW 和 LEO:
- HW: High Watermark,高水位,表示已經送出(commit)的最大日志偏移量,Kafka 中某條日志“已送出”的意思是 ISR 中所有節點都包含了此條日志,并且消費者隻能消費 HW 之前的資料;
- LEO: Log End Offset,表示目前 log 檔案中下一條待寫入消息的 offset;
如上圖所示,它代表一個日志檔案,這個日志檔案中有 8 條消息,0 至 5 之間的消息為已送出消息,5 至 7 的消息為未送出消息。日志檔案的 HW 為 6,表示消費者隻能拉取到 5 之前的消息,而 offset 為 5 的消息對消費者而言是不可見的。日志檔案的 LEO 為 8,下一條消息将在此處寫入。
注意:所有副本都有對應的 HW 和 LEO,隻不過 Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來定義所在分區的高水位。換句話說,分區的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特點:
- Leader HW:min(所有副本 LEO),為此 Leader 副本不僅要儲存自己的 HW 和 LEO,還要儲存 follower 副本的 HW 和 LEO,而 follower 副本隻需儲存自己的 HW 和 LEO;
- Follower HW:min(follower 自身 LEO,leader HW)。
注意:為友善描述,下面Leader HW簡記為HWL,Follower HW簡記為F,Leader LEO簡記為LEOL ,Follower LEO簡記為LEOF。
下面我們示範一次完整的 HW / LEO 更新流程:
- 初始狀态
HWL=0,LEOL=0,HWF=0,LEOF=0。
- Follower 第一次 fetch
- Leader收到Producer發來的一條消息完成存儲, 更新LEOL=1;
- Follower從Leader fetch資料, Leader收到請求,記錄follower的LEOF =0,并且嘗試更新HWL =min(全部副本LEO)=0;
- eade傳回HWL=0和LEOL=1給Follower,Follower存儲消息并更新LEOF =1, HW=min(LEOF,HWL)=0。
- Follower 第二次 fetch
- Follower再次從Leader fetch資料, Leader收到請求,記錄follower的LEOF =1,并且嘗試更新HWL =min(全部副本LEO)=1;
- leade傳回HWL=1和LEOL=1給Follower,Leader收到請求,更新自己的 HW=min(LEOF,HWL)=1。
上述更新流程中 Follower 和 Leader 的 HW 更新有時間 GAP。如果 Leader 節點在此期間發生故障,則 Follower 的 HW 和 Leader 的 HW 可能會處于不一緻狀态,如果 Followe 被選為新的 Leader 并且以自己的 HW 為準對外提供服務,則可能帶來資料丢失或資料錯亂問題。
KIP-101 問題:資料丢失&資料錯亂 ^參 5^
資料丢失
第 1 步:
- 副本 B 作為 leader 收到 producer 的 m2 消息并寫入本地檔案,等待副本 A 拉取。
- 副本 A 發起消息拉取請求,請求中攜帶自己的最新的日志 offset(LEO=1),B 收到後更新自己的 HW 為 1,并将 HW=1 的資訊以及消息 m2 傳回給 A。
- A 收到拉取結果後更新本地的 HW 為 1,并将 m2 寫入本地檔案。發起新一輪拉取請求(LEO=2),B 收到 A 拉取請求後更新自己的 HW 為 2,沒有新資料隻将 HW=2 的資訊傳回給 A,并且回複給 producer 寫入成功。此處的狀态就是圖中第一步的狀态。
第 2 步:
此時,如果沒有異常,A 會收到 B 的回複,得知目前的 HW 為 2,然後更新自身的 HW 為 2。但在此時 A 重新開機了,沒有來得及收到 B 的回複,此時 B 仍然是 leader。A 重新開機之後會以 HW 為标準截斷自己的日志,因為 A 作為 follower 不知道多出的日志是否是被送出過的,防止資料不一緻進而截斷多餘的資料并嘗試從 leader 那裡重新同步。
第 3 步:
B 崩潰了,min.isr 設定的是 1,是以 zookeeper 會從 ISR 中再選擇一個作為 leader,也就是 A,但是 A 的資料不是完整的,進而出現了資料丢失現象。
問題在哪裡?在于 A 重新開機之後以 HW 為标準截斷了多餘的日志。不截斷行不行?不行,因為這個日志可能沒被送出過(也就是沒有被 ISR 中的所有節點寫入過),如果保留會導緻日志錯亂。
資料錯亂
在分析日志錯亂的問題之前,我們需要了解到 kafka 的副本可靠性保證有一個前提:在 ISR 中至少有一個節點。如果節點均當機的情況下,是不保證可靠性的,在這種情況會出現資料丢失,資料丢失是可接受的。這裡我們分析的問題比資料丢失更加槽糕,會引發日志錯亂甚至導緻整個系統異常,而這是不可接受的。
第 1 步:
- A 和 B 均為 ISR 中的節點。副本 A 作為 leader,收到 producer 的消息 m2 的請求後寫入 PageCache 并在某個時刻重新整理到本地磁盤。
- 副本 B 拉取到 m2 後寫入 PageCage 後(尚未刷盤)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A 收到更新自己的 HW 為 1 并回複給 producer 成功。
- 此時 A 和 B 同時當機,B 的 m2 由于尚未刷盤,是以 m2 消息丢失。此時的狀态就是第 1 步的狀态。
第 2 步:
由于 A 和 B 均當機,而 min.isr=1 并且 unclean.leader.election.enable=true(關閉 unclean 選擇政策),是以 Kafka 會等到第一個 ISR 中的節點恢複并選為 leader,這裡不幸的是 B 被選為 leader,而且還接收到 producer 發來的新消息 m3。注意,這裡丢失 m2 消息是可接受的,畢竟所有節點都當機了。
第 3 步:
A 恢複重新開機後發現自己是 follower,而且 HW 為 2,并沒有多餘的資料需要截斷,是以開始和 B 進行新一輪的同步。但此時 A 和 B 均沒有意識到,offset 為 1 的消息不一緻了。
問題在哪裡?在于日志的寫入是異步的,上面也提到 Kafka 的副本政策的一個設計是消息的持久化是異步的,這就會導緻在場景二的情況下被選出的 leader 不一定包含所有資料,進而引發日志錯亂的問題。
Leader Epoch
為了解決上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期号的概念很類似,每次重新選擇 leader 的時候,用一個嚴格單調遞增的 id 來标志,可以讓所有 follower 意識到 leader 的變化。而 follower 也不再以 HW 為準,每次奔潰重新開機後都需要去 leader 那邊确認下目前 leader 的日志是從哪個 offset 開始的。下面看下 Leader Epoch 是如何解決上面兩個問題的。
資料丢失解決
這裡的關鍵點在于副本 A 重新開機後作為 follower,不是忙着以 HW 為準截斷自己的日志,而是先發起 LeaderEpochRequest 詢問副本 B 第 0 代的最新的偏移量是多少,副本 B 會傳回自己的 LEO 為 2 給副本 A,A 此時就知道消息 m2 不能被截斷,是以 m2 得到了保留。當 A 選為 leader 的時候就保留了所有已送出的日志,日志丢失的問題得到解決。
如果發起 LeaderEpochRequest 的時候就已經挂了怎麼辦?這種場景下,不會出現日志丢失,因為副本 A 被選為 leader 後不會截斷自己的日志,日志截斷隻會發生在 follower 身上。
資料錯亂解決
這裡的關鍵點還是在第 3 步,副本 A 重新開機作為 follower 的第一步還是需要發起 LeaderEpochRequest 詢問 leader 目前第 0 代最新的偏移量是多少,由于副本 B 已經經過換代,是以會傳回給 A 第 1 代的起始偏移(也就是 1),A 發現沖突後會截斷自己偏移量為 1 的日志,并重新開始和 leader 同步。副本 A 和副本 B 的日志達到了一緻,解決了日志錯亂。
小結
Broker 接收到消息後隻是将資料寫入 PageCache 後便認為消息已寫入成功,但是,通過副本機制并結合 ACK 政策可以大機率規避單機當機帶來的資料丢失問題,并通過 HW、副本同步機制、 Leader Epoch 等多種措施解決了多副本間資料同步一緻性問題,最終實作了 Broker 資料的可靠持久化。
消費者從 Broker 消費到消息且最好隻消費一次
Consumer 在消費消息的過程中需要向 Kafka 彙報自己的位移資料,隻有當 Consumer 向 Kafka 彙報了消息位移,該條消息才會被 Broker 認為已經被消費。是以,Consumer 端消息的可靠性主要和 offset 送出方式有關,Kafka 消費端提供了兩種消息送出方式:
正常情況下我們很難實作 exactly once 語意的消息,通常是通過手動送出+幂等實作消息的可靠消費。
Kafka 高性能探究
Kafka 高性能的核心是保障系統低延遲、高吞吐地處理消息,為此,Kafaka 采用了許多精妙的設計:
- 異步發送
- 批量發送
- 壓縮技術
- Pagecache 機制&順序追加落盤
- 零拷貝
- 稀疏索引
- broker & 資料分區
- 多 reactor 多線程網絡模型
異步發送
如上文所述,Kafka 提供了異步和同步兩種消息發送方式。在異步發送中,整個流程都是異步的。調用異步發送方法後,消息會被寫入 channel,然後立即傳回成功。Dispatcher 協程會從 channel 輪詢消息,将其發送到 Broker,同時會有另一個異步協程負責處理 Broker 傳回的結果。同步發送本質上也是異步的,但是在處理結果時,同步發送通過 waitGroup 将異步操作轉換為同步。使用異步發送可以最大化提高消息發送的吞吐能力。
批量發送
Kafka 支援批量發送消息,将多個消息打包成一個批次進行發送,進而減少網絡傳輸的開銷,提高網絡傳輸的效率和吞吐量。Kafka 的批量發送消息是通過以下兩個參數來控制的:
- batch.size:控制批量發送消息的大小,預設值為 16KB,可适當增加 batch.size 參數值提升吞吐。但是,需要注意的是,如果批量發送的大小設定得過大,可能會導緻消息發送的延遲增加,是以需要根據實際情況進行調整。
- linger.ms:控制消息在批量發送前的等待時間,預設值為 0。當 linger.ms 大于 0 時,如果有消息發送,Kafka 會等待指定的時間,如果等待時間到達或者批量大小達到 batch.size,就會将消息打包成一個批次進行發送。可适當增加 linger.ms 參數值提升吞吐,比如 10 ~ 100。
在 Kafka 的生産者用戶端中,當發送消息時,如果啟用了批量發送,Kafka 會将消息緩存到緩沖區中。當緩沖區中的消息大小達到 batch.size 或者等待時間到達 linger.ms 時,Kafka 會将緩沖區中的消息打包成一個批次進行發送。如果在等待時間内沒有達到 batch.size,Kafka 也會将緩沖區中的消息發送出去,進而避免消息積壓。
壓縮技術
Kafka 支援壓縮技術,通過将消息進行壓縮後再進行傳輸,進而減少網絡傳輸的開銷(壓縮和解壓縮的過程會消耗一定的 CPU 資源,是以需要根據實際情況進行調整。),提高網絡傳輸的效率和吞吐量。Kafka 支援多種壓縮算法,在 Kafka2.1.0 版本之前,僅支援 GZIP,Snappy 和 LZ4,2.1.0 後還支援 Zstandard 算法(Facebook 開源,能夠提供超高壓縮比)。這些壓縮算法性能對比(兩名額都是越高越好)如下:
- 吞吐量:LZ4>Snappy>zstd 和 GZIP,壓縮比:zstd>LZ4>GZIP>Snappy。
在 Kafka 中,壓縮技術是通過以下兩個參數來控制的:
- compression.type:控制壓縮算法的類型,預設值為 none,表示不進行壓縮。
- compression.level:控制壓縮的級别,取值範圍為 0-9,預設值為-1。當值為-1 時,表示使用預設的壓縮級别。
在 Kafka 的生産者用戶端中,當發送消息時,如果啟用了壓縮技術,Kafka 會将消息進行壓縮後再進行傳輸。在消費者用戶端中,如果消息進行了壓縮,Kafka 會在消費消息時将其解壓縮。注意:Broker 如果設定了和生産者不通的壓縮算法,接收消息後會解壓後重新壓縮儲存。Broker 如果存在消息版本相容也會觸發解壓後再壓縮。
Pagecache 機制&順序追加落盤
kafka 為了提升系統吞吐、降低延遲時間,Broker 接收到消息後隻是将資料寫入PageCache後便認為消息已寫入成功,而 PageCache 中的資料通過 linux 的 flusher 程式進行異步刷盤(避免了同步刷盤的巨大系統開銷),将資料順序追加寫到磁盤日志檔案中。由于 pagecache 是在記憶體中進行緩存,是以讀寫速度非常快,可以大大提高讀寫效率。順序追加寫充分利用順序 I/O 寫操作,避免了緩慢的随機 I/O 操作,可有效提升 Kafka 吞吐。
如上圖所示,消息被順序追加到每個分區日志檔案的尾部。
零拷貝
Kafka 中存在大量的網絡資料持久化到磁盤(Producer 到 Broker)和磁盤檔案通過網絡發送(Broker 到 Consumer)的過程,這一過程的性能直接影響 Kafka 的整體吞吐量。傳統的 IO 操作存在多次資料拷貝和上下文切換,性能比較低。Kafka 利用零拷貝技術提升上述過程性能,其中網絡資料持久化磁盤主要用 mmap 技術,網絡資料傳輸環節主要使用 sendfile 技術。
索引加速之 mmap
傳統模式下,資料從網絡傳輸到檔案需要 4 次資料拷貝、4 次上下文切換和兩次系統調用。如下圖所示:
為了減少上下文切換以及資料拷貝帶來的性能開銷,Kafka使用mmap來處理其索引檔案。Kafka中的索引檔案用于在提取日志檔案中的消息時進行高效查找。這些索引檔案被維護為記憶體映射檔案,這允許Kafka快速通路和搜尋記憶體中的索引,進而加速在日志檔案中定位消息的過程。mmap 将核心中讀緩沖區(read buffer)的位址與使用者空間的緩沖區(user buffer)進行映射,進而實作核心緩沖區與應用程式記憶體的共享,省去了将資料從核心讀緩沖區(read buffer)拷貝到使用者緩沖區(user buffer)的過程,整個拷貝過程會發生 4 次上下文切換,1 次CPU 拷貝和 2次 DMA 拷貝。
網絡資料傳輸之 sendfile
傳統方式實作:先讀取磁盤、再用 socket 發送,實際也是進過四次 copy。如下圖所示:
為了減少上下文切換以及資料拷貝帶來的性能開銷,Kafka 在 Consumer 從 Broker 讀資料過程中使用了 sendfile 技術。具體在這裡采用的方案是通過 NIO 的 transferTo/transferFrom 調用作業系統的 sendfile 實作零拷貝。總共發生 2 次核心資料拷貝、2 次上下文切換和一次系統調用,消除了 CPU 資料拷貝,如下:
稀疏索引
為了友善對日志進行檢索和過期清理,kafka 日志檔案除了有用于存儲日志的.log 檔案,還有一個位移索引檔案.index和一個時間戳索引檔案.timeindex 檔案,并且三檔案的名字完全相同,如下:
Kafka 的索引檔案是按照稀疏索引的思想進行設計的。稀疏索引的核心是不會為每個記錄都儲存索引,而是寫入一定的記錄之後才會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數進行控制,預設大小為 4 KB,意味着 Kafka 至少寫入 4KB 消息資料之後,才會在索引檔案中增加一個索引項。可見,單條消息大小會影響 Kakfa 索引的插入頻率,是以 log.index.interval.bytes 也是 Kafka 調優一個重要參數值。由于索引檔案也是按照消息的順序性進行增加索引項的,是以 Kafka 可以利用二分查找算法來搜尋目标索引項,把時間複雜度降到了 O(lgN),大大減少了查找的時間。
位移索引檔案.index
位移索引檔案的索引項結構如下:
相對位移:儲存于索引檔案名字上面的起始位移的內插補點,假設一個索引檔案為:00000000000000000100.index,那麼起始位移值即 100,當存儲位移為 150 的消息索引時,在索引檔案中的相對位移則為 150 - 100 = 50,這麼做的好處是使用 4 位元組儲存位移即可,可以節省非常多的磁盤空間。
檔案實體位置:消息在 log 檔案中儲存的位置,也就是說 Kafka 可根據消息位移,通過位移索引檔案快速找到消息在 log 檔案中的實體位置,有了該實體位置的值,我們就可以快速地從 log 檔案中找到對應的消息了。下面我用圖來表示 Kafka 是如何快速檢索消息:
假設 Kafka 需要找出位移為 3550 的消息,那麼 Kafka 首先會使用二分查找算法找到小于 3550 的最大索引項:[3528, 2310272],得到索引項之後,Kafka 會根據該索引項的檔案實體位置在 log 檔案中從位置 2310272 開始順序查找,直至找到位移為 3550 的消息記錄為止。
時間戳索引檔案.timeindex
Kafka 在 0.10.0.0 以後的版本當中,消息中增加了時間戳資訊,為了滿足使用者需要根據時間戳查詢消息記錄,Kafka 增加了時間戳索引檔案,時間戳索引檔案的索引項結構如下:
時間戳索引檔案的檢索與位移索引檔案類似,如下快速檢索消息示意圖:
broker & 資料分區
Kafka 叢集包含多個 broker。一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多台機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。
多 reactor 多線程網絡模型
多 Reactor 多線程網絡模型 是一種高效的網絡通信模型,可以充分利用多核 CPU 的性能,提高系統的吞吐量和響應速度。Kafka 為了提升系統的吞吐,在 Broker 端處理消息時采用了該模型,示意如下:
SocketServer和KafkaRequestHandlerPool是其中最重要的兩個元件:
- SocketServer:實作 Reactor 模式,用于處理多個 Client(包括用戶端和其他 broker 節點)的并發請求,并将處理結果傳回給 Client
- KafkaRequestHandlerPool:Reactor 模式中的 Worker 線程池,裡面定義了多個工作線程,用于處理實際的 I/O 請求邏輯。
整個服務端處理請求的流程大緻分為以下幾個步驟:
- Acceptor 接收用戶端發來的請求
- 輪詢分發給 Processor 線程處理
- Processor 将請求封裝成 Request 對象,放到 RequestQueue 隊列
- KafkaRequestHandlerPool 配置設定工作線程,處理 RequestQueue 中的請求
- KafkaRequestHandler 線程處理完請求後,将響應 Response 傳回給 Processor 線程
- Processor 線程将響應傳回給用戶端
其他知識探究
負載均衡
生産者負載均衡
Kafka 生産端的負載均衡主要指如何将消息發送到合适的分區。Kafka 生産者生産消息時,根據分區器将消息投遞到指定的分區中,是以 Kafka 的負載均衡很大程度上依賴于分區器。Kafka 預設的分區器是 Kafka 提供的 DefaultPartitioner。它的分區政策是根據 Key 值進行分區配置設定的:
- 如果 key 不為 null:對 Key 值進行 Hash 計算,從所有分區中根據 Key 的 Hash 值計算出一個分區号;擁有相同 Key 值的消息被寫入同一個分區,順序消息實作的關鍵;
- 如果 key 為 null:消息将以輪詢的方式,在所有可用分區中分别寫入消息。如果不想使用 Kafka 預設的分區器,使用者可以實作 Partitioner 接口,自行實作分區方法。
消費者負載均衡
在 Kafka 中,每個分區(Partition)隻能由一個消費者組中的一個消費者消費。當消費者組中有多個消費者時,Kafka 會自動進行負載均衡,将分區均勻地配置設定給每個消費者。在 Kafka 中,消費者負載均衡算法可以通過設定消費者組的 partition.assignment.strategy 參數來選擇。目前主流的分區配置設定政策以下幾種:
- range: 在保證均衡的前提下,将連續的分區配置設定給消費者,對應的實作是 RangeAssignor;
- round-robin:在保證均衡的前提下,輪詢配置設定,對應的實作是 RoundRobinAssignor;
- 0.11.0.0 版本引入了一種新的分區配置設定政策 StickyAssignor,其優勢在于能夠保證分區均衡的前提下盡量保持原有的分區配置設定結果,進而避免許多備援的分區配置設定操作,減少分區再配置設定的執行時間。
叢集管理
Kafka 借助 ZooKeeper 進行叢集管理。Kafka 中很多資訊都在 ZK 中維護,如 broker 叢集資訊、consumer 叢集資訊、 topic 相關資訊、 partition 資訊等。Kafka 的很多功能也是基于 ZK 實作的,如 partition 選主、broker 叢集管理、consumer 負載均衡等,限于篇幅本文将不展開陳述,這裡先附一張網上截圖大家感受下: