天天看點

一文讀懂Kafka

學習之前,先來了解一下網絡通信模型,網絡通信模型的發展如下

單線程 => 多線程 => 線程池 => Reactor模型

Kafka所采用的Reactor模型如下

一文讀懂Kafka

簡單來說就是,Broker

中有個Acceptor(mainReactor)監聽新連接配接的到來,與新連接配接建連之後輪詢選擇一個Processor(subReactor)管理這個連接配接。而Processor會監聽其管理的連接配接,當事件到達之後,讀取封裝成Request,并将Request放入共享請求隊列中。然後IO線程池不斷的從該隊列中取出請求,執行真正的處理。處理完之後将響應發送到對應的Processor的響應隊列中,然後由Processor将Response返還給用戶端。每個listener隻有一個Acceptor線程,因為它隻是作為新連接配接建連再分發,沒有過多的邏輯,很輕量,一個足矣。

1.Kafka概述

1.1 概述

Kafka是一個分布式的基于釋出/訂閱模式的消息隊列,主要應用與大資料實時處理領域

1.2 消息隊列

1.2.1 傳統消息隊列的應用場景
一文讀懂Kafka

使用消息隊列的好處

  • 1)解耦 : 允許你獨立的擴充或修改兩邊的處理過程,隻要確定它們遵守同樣的接口限制。
  • 2)可恢複性 :系統的一部分元件失效時,不會影響到整個系統。消息隊列降低了程序間的耦合度,是以即使一個處理消息的程序挂掉,加入隊列中的消息仍然可以在系統恢複後被處理。
  • 3)緩沖 : 有助于控制和優化資料流經過系統的速度,解決生産消息和消費消息的處理速度不一緻 的情況。
  • 4)靈活性 & 峰值處理能力 : 在通路量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值通路為标準來投入資源随時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵元件頂住突發的通路壓力,而不會因為突發的超負荷的請求而完全崩潰。
  • 5)異步通信 : 很多時候,使用者不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許使用者把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然後在需要 的時候再去處理它們。
1.2.2 消息隊列的兩種模式
(1)點對點模式(一對一,消費者主動拉取資料,消息收到後消息清除)
消息生産者生産消息發送到Queue中,然後消息消費者從Queue中取出并且消費消息。 消息被消費以後,queue中不再有存儲,是以消息消費者不可能消費到已經被消費的消息。 Queue 支援存在多個消費者,但是對一個消息而言,隻會有一個消費者可以消費。
一文讀懂Kafka
(2)釋出/訂閱模式(一對多,消費者消費資料之後不會清除消息)
消息生産者(釋出)将消息釋出到 topic 中,同時有多個消息消費者(訂閱)消費該消 息。和點對點方式不同,釋出到 topic的消息會被所有訂閱者消費。
一文讀懂Kafka

1.3 Kafka 基礎架構

一文讀懂Kafka

1)Producer :消息生産者,就是向 kafka broker 發消息的用戶端;

2)Consumer :消息消費者,向 kafkabroker 取消息的用戶端;

3)Consumer Group (CG):消費者組,由多個 consumer 組成。消費者組内每個消費者負責消費不同分區的資料,一個分區隻能由一個組内消費者消費;消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。

4)Broker :一台 kafka 伺服器就是一個 broker。一個叢集由多個 broker 組成。一個 broker 可以容納多個 topic。

5)Topic :可以了解為一個隊列,生産者和消費者面向的都是一個 topic;

6)Partition:為了實作擴充性,一個非常大的 topic可以分布到多個 broker(即伺服器)上, 一個 topic 可以分為多個 partition,每個 partition是一個有序的隊列;

7)Replica:副本,為保證叢集中的某個節點發生故障時,該節點上的 partition 資料不丢失,且 kafka仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分區都有若幹個副本, 一個 leader 和若幹個 follower。

8)leader:每個分區多個副本的“主”,生産者發送資料的對象,以及消費者消費資料的對 象都是 leader。

9)follower:每個分區多個副本中的“從”,實時從 leader 中同步資料,保持和 leader 資料 的同步。leader發生故障時,某個 follower 會成為新的 follower。

2.Kafka 架構深入

3.1 Kafka工作流程及檔案存儲機制

工作流程

一文讀懂Kafka

Kafka 中消息是以 topic 進行分類的,生産者生産消息,消費者消費消息,都是面向 topic 的。

topic 是邏輯上的概念,而partition 是實體上的概念,每個 partition 對應于一個 log 文 件,該 log 檔案中存儲的就是 producer生産的資料。Producer 生産的資料會被不斷追加到該 log 檔案末端,且每條資料都有自己的offset。消費者組中的每個消費者,都會實時記錄自己 消費到了哪個 offset,以便出錯恢複時,從上次的位置繼續消費。

存儲機制

一文讀懂Kafka

由于生産者生産的消息會不斷追加到 log 檔案末尾,為防止 log 檔案過大導緻資料定位 效率低下,Kafka 采取了分片和索引機制,将每個partition 分為多個 segment。每個 segment對應兩個檔案——“.index”檔案和“.log”檔案。這些檔案位于一個檔案夾下,該檔案夾的命名 規則為:topic 名稱+分區序号。例如,first 這個 topic 有三個分區,則其對應的檔案夾為 first- 0,first-1,first-2。

00000000000000000000.index

00000000000000000000.log

00000000000000170410.index

00000000000000170410.log

00000000000000239430.index

00000000000000239430.log

index 和 log 檔案以目前 segment 的第一條消息的 offset 命名。下圖為 index 檔案和 log

檔案的結構示意圖。

一文讀懂Kafka

“.index”檔案存儲大量的索引資訊,“.log”檔案存儲大量的資料,索引檔案中的中繼資料指向對應資料檔案中 message 的實體偏移位址。

通過2分查找法,找到偏移量,通過偏移量到資料檔案找到資料

2.2 Kafka生産者

2.2.1 分區政策
  • 1)分區的原因

    (1)友善在叢集中擴充,每個 Partition 可以通過調整以适應它所在的機器,而一個 topic又可以有多個 Partition 組成,是以整個叢集就可以适應任意大小的資料了;

    (2)可以提高并發,因為可以以 Partition 為機關讀寫了。

  • 2)分區的原則

    我們需要将 producer 發送的資料封裝成一個 ProducerRecord 對象。

    一文讀懂Kafka

(1)指明 partition 的情況下,直接将指明的值直接作為 partiton 值;

(2)沒有指明 partition 值但有 key 的情況下,将 key 的 hash 值與 topic 的 partition 數進行取餘得到 partition 值;

(3)既沒有 partition 值又沒有 key 值的情況下,第一次調用時随機生成一個整數(後 面每次調用在這個整數上自增),将這個值與 topic可用的 partition 總數取餘得到 partition 值,也就是常說的 round-robin 算法。

低版本ISR, 有條數跟時間的要求

高版本的ISR ,去掉了條數的限制(因為ISR頻繁)

2.2.2 資料可靠性保證

為保證 producer 發送的資料,能可靠的發送到指定的 topic,topic 的每個 partition 收到 producer發送的資料後,都需要向 producer 發送 ack(acknowledgement 确認收到),如果 producer 收到ack,就會進行下一輪的發送,否則重新發送資料。
一文讀懂Kafka
1)副本資料同步政策
一文讀懂Kafka

Kafka 選擇了第二種方案,原因如下:

  • 1.同樣為了容忍 n 台節點的故障,第一種方案需要 2n+1 個副本,而第二種方案隻需要 n+1

    個副本,而 Kafka 的每個分區都有大量的資料,第一種方案會造成大量資料的備援。

  • 2.雖然第二種方案的網絡延遲會比較高,但網絡延遲對 Kafka 的影響較小。
2)ISR
  • 采用第二種方案之後,設想以下情景:leader 收到資料,所有 follower 都開始同步資料, 但有一個follower,因為某種故障,遲遲不能與 leader 進行同步,那 leader 就要一直等下去, 直到它完成同步,才能發送ack。這個問題怎麼解決呢?
  • Leader 維護了一個動态的 in-sync replica set (ISR),意為和 leader保持同步的 follower 集 合。當 ISR 中的 follower 完成資料的同步之後,leader 就會給 follower 發送ack。如果 follower 長時間 未 向 leader 同 步 數 據 , 則 該 follower 将 被 踢 出 ISR , 該時 間 阈 值 由 replica.lag.time.max.ms 參數設定。Leader 發生故障之後,就會從 ISR 中選舉新的leader。
3)ack 應答機制
對于某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丢失,是以沒必要等 ISR 中的 follower 全部接收成功。是以Kafka 為使用者提供了三種可靠性級别,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks 參數配置:

acks:(隻保證資料不丢失,不保證資料不重複)
  • 0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經傳回,當 broker 故障時有可能丢失資料;
  • 1:producer 等待 broker 的ack,partition 的 leader 落盤成功後傳回 ack,如果在 follower同步成功之前 leader故障,那麼将會丢失資料;
  • 1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower(指的是ISR中的follower) 全部落盤成功後才 傳回 ack。但是如果在 follower 同步完成後,broker 發送 ack 之前,leader 發生故障,那麼會造成資料重複。(極限情況,當ISR中隻有leader,生産資料也有可能會丢失,場景與acks=1的情況)

acks=1 資料丢失案例

一文讀懂Kafka

acks=-1 資料丢失案例

一文讀懂Kafka
4)故障處理細節
一文讀懂Kafka

LEO:指的是每個副本最大的 offset;

HW:指的是消費者能見到的最大的 offset,ISR 隊列中最小的 LEO。(做了兩件事:消費一緻性 ,存儲一緻性,不解決丢失與重複問題,acks解決丢失與重複性) **

(1)follower 故障

follower 發生故障後會被臨時踢出 ISR,待該 follower 恢複後,follower 會讀取本地磁盤 記錄的上次的 HW,并将log 檔案高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步。 等該 follower 的 LEO 大于等于該Partition 的 HW,即 follower 追上 leader 之後,就可以重 新加入 ISR 了。

(2)leader 故障

leader 發生故障之後,會從 ISR 中選出一個新的 leader,之後,為保證多個副本之間的資料一緻性,其餘的 follower會先将各自的 log 檔案高于 HW 的部分截掉,然後從新的 leader同步資料。

注意:這隻能保證副本之間的資料一緻性,并不能保證資料不丢失或者不重複。

2.2.3 Exactly Once 語義
  • 将伺服器的 ACK 級别設定為-1,可以保證 Producer 到 Server 之間不會丢失資料,即 At Least Once 語義。相對的,将伺服器 ACK 級别設定為 0,可以保證生産者每條消息隻會被 發送一次,即 At Most Once 語義。
  • At Least Once 可以保證資料不丢失,但是不能保證資料不重複;相對的,At Least Once 可以保證資料不重複,但是不能保證資料不丢失。但是,對于一些非常重要的資訊,比如說 交易資料,下遊資料消費者要求資料既不重複也不丢失,即Exactly Once 語義。在 0.11 版 本以前的 Kafka,對此是無能為力的,隻能保證資料不丢失,再在下遊消費者對資料做全局去重。對于多個下遊應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大影響。
  • 0.11 版本的 Kafka,引入了一項重大特性:幂等性。所謂的幂等性就是指 Producer 不論 向 Server 發送多少次重複資料,Server 端都隻會持久化一條。幂等性結合 At Least Once 語 義,就構成了 Kafka 的 Exactly Once 語義。即: At Least Once + 幂等性 = Exactly Once ( Exactly Once : 幂等性 enable.idompotence=true即可 (開啟後acks預設為-1)将去重放在kafka中)
  • 要啟用幂等性,隻需要将Producer 的參數中 enable.idompotence 設定為 true 即可。Kafka的幂等性實作其實就是将原來下遊需要做的去重放在了資料上遊。開啟幂等性的 Producer 在 初始化的時候會被配置設定一個 PID,發往同一Partition 的消息會附帶 Sequence Number。而 Broker 端會對<PID, Partition,SeqNumber>做緩存,當具有相同主鍵的消息送出時,Broker 隻 會持久化一條。
  • 但是 PID 重新開機就會變化,同時不同的Partition 也具有不同主鍵,是以幂等性無法保證跨 分區跨會話的 Exactly Once。

2.3 Kafka消費者

2.3.1 消費方式

consumer 采用 pull(拉)模式從 broker 中讀取資料。

push(推)模式很難适應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。

它的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull模式則可以根據 consumer 的消費能力以适 當的速率消費消息。

pull 模式不足之處是,如果 kafka沒有資料,消費者可能會陷入循環中,一直傳回空資料。針對這一點,Kafka 的消費者在消費資料時會傳入一個時長參數timeout,如果目前沒有 資料可供消費,consumer 會等待一段時間之後再傳回,這段時長即為 timeout。

2.3.2 分區配置設定政策 (同一個組裡的可以同時消費一個主題,不能同時消費一個分區)

一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,是以必然會涉及到partition 的配置設定問題,即确定那個 partition 由哪個 consumer 來消費。Kafka 有兩種配置設定政策,

一是RoundRobin,一是 Range。

1)RoundRobin

一文讀懂Kafka

2)Range

一文讀懂Kafka

分區規則:

輪詢(RoundRobin):按順序一個一個分(隻有當消費者組訂閱的主題是一樣的,才能用)消費者綁定多個topic的時候,topicAndPartition會将所有topic的資料取出來,計算出hash值,排序後輪詢配置設定到consumer

案例(有消費者組A,B topic : T1(分區0,1,2),T2(分區0,1,2)),A訂閱T1,B訂閱T2會發生什麼情況

  • 因為采用了輪詢,是以把topic所有資料T1,T2的所有分區資料取出來做了一個排序(通過hash)然後輪詢配置設定,會導緻A讀到了T2的資料,B讀到了T1的資料,是以不能用這個方式:

按組來配置設定消息給消費者的随機(Range)(預設是這個)

假設有7個分區,3個消費者,則 消費者1 - 3個分區 , 消費者2- 2個分區, 消費者3- 2個分區

如果消費者訂閱的主題是一樣的 則可以用RoundRobin,否則用Range

:按主題來配置設定消息 (先考慮誰訂閱了該主題,在隻有訂閱的消費者中配置設定)

當消費者個數發生變化,會觸發重新配置設定

按 消費組+主題+分區 儲存偏移量 GTP

2.3 offset 的維護
由于 consumer 在消費過程中可能會出現斷電當機等故障,consumer 恢複後,需要從故 障前的位置的繼續消費,是以consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢 複後繼續消費。
一文讀懂Kafka

Kafka 0.9 版本之前,consumer 預設将 offset 儲存在 Zookeeper 中,從 0.9 版本開始,

consumer 預設将 offset 儲存在 Kafka 一個内置的 topic 中,該 topic 為__consumer_offsets。

2.4 Kafka 高效讀寫資料

1)順序寫磁盤

Kafka 的 producer 生産資料,要寫入到 log 檔案中,寫的過程是一直追加到檔案末端,為順序寫。官網有資料表明,同樣的磁盤,順序寫能到 600M/s,而随機寫隻有 100K/s。這與磁盤的機械機構有關,順序寫之是以快,是因為其省去了大量磁頭尋址的時間。

2)零複制技術

一文讀懂Kafka
2.5 Zookeeper 在 Kafka 中的作用

Kafka 叢集中有一個 broker 會被選舉為 Controller,負責管理叢集 broker 的上下線,所有 topic 的分區副本配置設定和 leader 選舉等工作。Controller 的管理工作都是依賴于 Zookeeper 的。以下為 partition 的 leader 選舉過程:

一文讀懂Kafka
2.5 Kafka 事務

Kafka 從 0.11 版本開始引入了事務支援。事務可以保證 Kafka 在 Exactly Once 語義的基礎上,生産和消費可以跨分區和會話,要麼全部成功,要麼全部失敗。

2.6.1 Producer 事務

為了實作跨分區跨會話的事務,需要引入一個全局唯一的 Transaction ID,并将 Producer 獲得的PID和Transaction ID 綁定。這樣當Producer 重新開機後就可以通過正在進行的 Transaction ID 獲得原來的 PID。為了管理 Transaction,Kafka 引入了一個新的元件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 互動獲得 Transaction ID 對應的任務狀态。TransactionCoordinator 還負責将事務所有寫入 Kafka 的一個内部 Topic,這樣即使整個服務重新開機,由于事務狀态得到儲存,進行中的事務狀态可以得到恢複,進而繼續進行。

2.6.2 Consumer 事務

上述事務機制主要是從 Producer 方面考慮,對于 Consumer 而言,事務的保證就會相對 較弱,尤其時無法保證 Commit的資訊被精确消費。這是由于 Consumer 可以通過 offset 訪 問任意資訊,而且不同的 Segment File生命周期不同,同一事務的消息可能會出現重新開機後被 删除的情況。