天天看點

Kafka資料可靠性深度解讀

原文連結:http://www.infoq.com/cn/articles/depth-interpretation-of-kafka-data-reliability 

Kafka起初是由LinkedIn公司開發的一個分布式的消息系統,後成為Apache的一部分,它使用Scala編寫,以可水準擴充和高吞吐率而被廣泛使用。目前越來越多的開源分布式處理系統如Cloudera、Apache Storm、Spark等都支援與Kafka內建。

1 概述

Kafka與傳統消息系統相比,有以下不同:

  • 它被設計為一個分布式系統,易于向外擴充;
  • 它同時為釋出和訂閱提供高吞吐量;
  • 它支援多訂閱者,當失敗時能自動平衡消費者;
  • 它将消息持久化到磁盤,是以可用于批量消費,例如ETL以及實時應用程式。

Kafka憑借着自身的優勢,越來越受到網際網路企業的青睐,唯品會也采用Kafka作為其内部核心消息引擎之一。Kafka作為一個商業級消息中間件,消息可靠性的重要性可想而知。如何確定消息的精确傳輸?如何確定消息的準确存儲?如何確定消息的正确消費?這些都是需要考慮的問題。

本文首先從Kafka的架構着手,先了解下Kafka的基本原理,然後通過對kakfa的存儲機制、複制原理、同步原理、可靠性和持久性保證等等一步步對其可靠性進行分析,最後通過benchmark來增強對Kafka高可靠性的認知。

2 Kafka體系架構

如上圖所示,一個典型的Kafka體系架構包括若幹Producer(可以是伺服器日志,業務資料,頁面前端産生的page view等等),若幹broker(Kafka支援水準擴充,一般broker數量越多,叢集吞吐率越高),若幹Consumer (Group),以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。Producer使用push(推)模式将消息釋出到broker,Consumer使用pull(拉)模式從broker訂閱并消費消息。

2.1 Topic & Partition

一個topic可以認為一個一類消息,每個topic将被分成多個partition,每個partition在存儲層面是append log檔案。任何釋出到此partition的消息都會被追加到log檔案的尾部,每條消息在檔案中的位置稱為offset(偏移量),offset為一個long型的數字,它唯一标記一條消息。每條消息都被append到partition中,是順序寫磁盤,是以效率非常高(經驗證,順序寫磁盤效率比随機寫記憶體還要高,這是Kafka高吞吐率的一個很重要的保證)。

每一條消息被發送到broker中,會根據partition規則選擇被存儲到哪一個partition。如果partition規則設定的合理,所有消息可以均勻分布到不同的partition裡,這樣就實作了水準擴充。(如果一個topic對應一個檔案,那這個檔案所在的機器I/O将會成為這個topic的性能瓶頸,而partition解決了這個問題)。在建立topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數量(如下所示),當然可以在topic建立之後去修改partition的數量。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3        

在發送一條消息時,可以指定這個消息的key,producer根據這個key和partition機制來判斷這個消息發送到哪個partition。partition機制可以通過指定producer的partition.class這一參數來指定,該class必須實作kafka.producer.Partitioner接口。

有關Topic與Partition的更多細節,可以參考下面的“Kafka檔案存儲機制”這一節。

3 高可靠性存儲分析

Kafka的高可靠性的保障來源于其健壯的副本(replication)政策。通過調節其副本相關參數,可以使得Kafka在性能和可靠性之間運轉的遊刃有餘。Kafka從0.8.x版本開始提供partition級别的複制,replication的數量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。

這裡先從Kafka檔案存儲機制入手,從最底層了解Kafka的存儲細節,進而對其的存儲有個微觀的認知。之後通過Kafka複制原理和同步方式來闡述宏觀層面的概念。最後從ISR,HW,leader選舉以及資料可靠性和持久性保證等等各個次元來豐富對Kafka相關知識點的認知。

3.1 Kafka檔案存儲機制

Kafka中消息是以topic進行分類的,生産者通過topic向Kafka broker發送消息,消費者通過topic讀取資料。然而topic在實體層面又能以partition為分組,一個topic可以分成若幹個partition,那麼topic以及partition又是怎麼存儲的呢?partition還可以細分為segment,一個partition實體上由多個segment組成,那麼這些segment又是什麼呢?下面我們來一一揭曉。

為了便于說明問題,假設這裡隻有一個Kafka叢集,且這個叢集隻有一個Kafka broker,即隻有一台實體機。在這個Kafka broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs,以此來設定Kafka消息檔案存儲目錄,與此同時建立一個topic:topic_vms_test,partition的數量為4($KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 4 --topic topic_vms_test --replication-factor 4)。那麼我們此時可以在/tmp/kafka-logs目錄中可以看到生成了4個目錄:

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_vms_test-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_vms_test-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_vms_test-2
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_vms_test-3      

在Kafka檔案存儲中,同一個topic下有多個不同的partition,每個partiton為一個目錄,partition的名稱規則為:topic名稱+有序序号,第一個序号從0開始計,最大的序号為partition數量減1,partition是實際實體上的概念,而topic是邏輯上的概念。

上面提到partition還可以細分為segment,這個segment又是什麼?如果就以partition為最小存儲機關,我們可以想象當Kafka producer不斷發送消息,必然會引起partition檔案的無限擴張,這樣對于消息檔案的維護以及已經被消費的消息的清理帶來嚴重的影響,是以這裡以segment為機關又将partition細分。每個partition(目錄)相當于一個巨型檔案被平均配置設定到多個大小相等的segment(段)資料檔案中(每個segment 檔案中消息數量不一定相等)這種特性也友善old segment的删除,即友善已被消費的消息的清理,提高磁盤的使用率。每個partition隻需要支援順序讀寫就行,segment的檔案生命周期由服務端配置參數(log.segment.bytes,log.roll.{ms,hours}等若幹參數)決定。

segment檔案由兩部分組成,分别為“.index”檔案和“.log”檔案,分别表示為segment索引檔案和資料檔案。這兩個檔案的指令規則為:partition全局的第一個segment從0開始,後續每個segment檔案名為上一個segment檔案最後一條消息的offset值,數值大小為64位,20位數字字元長度,沒有數字用0填充,如下:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log      

以上面的segment檔案為例,展示出segment:00000000000000170410的“.index”檔案和“.log”檔案的對應的關系,如下圖:

(點選放大圖像)

如上圖,“.index”索引檔案存儲大量的中繼資料,“.log”資料檔案存儲大量的消息,索引檔案中的中繼資料指向對應資料檔案中message的實體偏移位址。其中以“.index”索引檔案中的中繼資料[3, 348]為例,在“.log”資料檔案表示第3個消息,即在全局partition中表示170410+3=170413個消息,該消息的實體偏移位址為348。

那麼如何從partition中通過offset查找message呢?以上圖為例,讀取offset=170418的消息,首先查找segment檔案,其中00000000000000000000.index為最開始的檔案,第二個檔案為00000000000000170410.index(起始偏移為170410+1=170411),而第三個檔案為00000000000000239430.index(起始偏移為239430+1=239431),是以這個offset=170418就落到了第二個檔案之中。其他後續檔案可以依次類推,以其實偏移量命名并排列這些檔案,然後根據二分查找法就可以快速定位到具體檔案位置。其次根據00000000000000170410.index檔案中的[8,1325]定位到00000000000000170410.log檔案中的1325的位置進行讀取。

要是讀取offset=170418的消息,從00000000000000170410.log檔案中的1325的位置進行讀取,那麼怎麼知道何時讀完本條消息,否則就讀到下一條消息的内容了?這個就需要聯系到消息的實體結構了,消息都具有固定的實體結構,包括:offset(8 Bytes)、消息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一條消息的大小,即讀取到哪裡截止。

3.2 複制原理和同步方式

Kafka中topic的每個partition有一個預寫式的日志檔案,雖然partition可以繼續細分為若幹個segment檔案,但是對于上層應用來說可以将partition看成最小的存儲單元(一個有多個segment檔案拼接的“巨型”檔案),每個partition都由一些列有序的、不可變的消息組成,這些消息被連續的追加到partition中。

上圖中有兩個新名詞:HW和LEO。這裡先介紹下LEO,LogEndOffset的縮寫,表示每個partition的log最後一條Message的位置。HW是HighWatermark的縮寫,是指consumer能夠看到的此partition的位置,這個涉及到多副本的概念,這裡先提及一下,下節再詳表。

言歸正傳,為了提高消息的可靠性,Kafka每個topic的partition有N個副本(replicas),其中N(大于等于1)是topic的複制因子(replica fator)的個數。Kafka通過多副本機制實作故障自動轉移,當Kafka叢集中一個broker失效情況下仍然保證服務可用。在Kafka中發生複制時確定partition的日志能有序地寫到其他節點上,N個replicas中,其中一個replica為leader,其他都為follower, leader處理partition的所有讀寫請求,與此同時,follower會被動定期地去複制leader上的資料。

如下圖所示,Kafka叢集中有4個broker, 某topic有3個partition,且複制因子即副本個數也為3:

Kafka提供了資料複制算法保證,如果leader發生故障或挂掉,一個新leader被選舉并被接受用戶端的消息成功寫入。Kafka確定從同步副本清單中選舉一個副本為leader,或者說follower追趕leader資料。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中所有follower滞後的狀态。當producer發送一條消息到broker後,leader寫入消息并複制到所有follower。消息送出之後才被成功複制到所有的同步副本。消息複制延遲受最慢的follower限制,重要的是快速檢測慢副本,如果follower“落後”太多或者失效,leader将會把它從ISR中删除。

3.3 ISR

上節我們涉及到ISR (In-Sync Replicas),這個是指副本同步隊列。副本數對Kafka的吞吐率是有一定的影響,但極大的增強了可用性。預設情況下Kafka的replica數量為1,即每個partition都有一個唯一的leader,為了確定消息的可靠性,通常應用中将其值(由broker的參數offsets.topic.replication.factor指定)大小設定為大于1,比如3。 所有的副本(replicas)統稱為Assigned Replicas,即AR。

ISR是AR中的一個子集,由leader維護ISR清單,follower從leader同步資料有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個次元, 目前最新的版本0.10.x中隻支援replica.lag.time.max.ms這個次元),任意一個超過門檻值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)清單,新加入的follower也會先存放在OSR中。AR=ISR+OSR。

Kafka 0.10.x版本後移除了replica.lag.max.messages參數,隻保留了replica.lag.time.max.ms作為ISR中副本管理的參數。為什麼這樣做呢?replica.lag.max.messages表示目前某個副本落後leaeder的消息數量超過了這個參數的值,那麼leader就會把follower從ISR中删除。假設設定replica.lag.max.messages=4,那麼如果producer一次傳送至broker的消息數量都小于4條時,因為在leader接受到producer發送的消息之後而follower副本開始拉取這些消息之前,follower落後leader的消息數不會超過4條消息,故此沒有follower移出ISR,是以這時候replica.lag.max.message的設定似乎是合理的。

但是producer發起瞬時高峰流量,producer一次發送的消息超過4條時,也就是超過replica.lag.max.messages,此時follower都會被認為是與leader副本不同步了,進而被踢出了ISR。但實際上這些follower都是存活狀态的且沒有性能問題。那麼在之後追上leader,并被重新加入了ISR。于是就會出現它們不斷地剔出ISR然後重新回歸ISR,這無疑增加了無謂的性能損耗。而且這個參數是broker全局的。設定太大了,影響真正“落後”follower的移除;設定的太小了,導緻follower的頻繁進出。無法給定一個合适的replica.lag.max.messages的值,故此,新版本的Kafka移除了這個參數。

注:ISR中包括:leader和follower。

上面一節還涉及到一個概念,即HW。HW俗稱高水位,HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO作為HW,consumer最多隻能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀态。對于leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步後更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中擷取。對于來自内部broKer的讀取請求,沒有HW的限制。

下圖詳細的說明了當producer生産消息至broker後,ISR以及HW和LEO的流轉過程:

由此可見,Kafka的複制機制既不是完全的同步複制,也不是單純的異步複制。事實上,同步複制要求所有能工作的follower都複制完,這條消息才會被commit,這種複制方式極大的影響了吞吐率。而異步複制方式下,follower異步的從leader複制資料,資料隻要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有複制完,落後于leader時,突然leader當機,則會丢失資料。而Kafka的這種使用ISR的方式則很好的均衡了確定資料不丢失以及吞吐率。

Kafka的ISR的管理最終都會回報到Zookeeper節點上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個地方會對這個Zookeeper的節點進行維護:

  1. Controller來維護:Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀态管理,也會執行類似于重配置設定partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知所有的replicas。
  2. leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發現ISR變化,則會将新的ISR的資訊傳回到Zookeeper的相關節點中。

3.4 資料可靠性和持久性保證

當producer向leader發送資料時,可以通過request.required.acks參數來設定資料可靠性的級别:

  • 1(預設):這意味着producer在ISR中的leader已成功收到的資料并得到确認後發送下一條message。如果leader當機了,則會丢失資料。
  • 0:這意味着producer無需等待來自broker的确認而繼續發送下一批消息。這種情況下資料傳輸效率最高,但是資料可靠性确是最低的。
  • -1:producer需要等待ISR中的所有follower都确認接收到資料後才算一次發送完成,可靠性最高。但是這樣也不能保證資料不丢失,比如當ISR中隻有leader時(前面ISR那一節講到,ISR中的成員由于某些情況會增加也會減少,最少就隻剩一個leader),這樣就變成了acks=1的情況。

如果要提高資料的可靠性,在設定request.required.acks=-1的同時,也要min.insync.replicas這個參數(可以在broker或者topic層面進行設定)的配合,這樣才能發揮最大的功效。min.insync.replicas這個參數設定ISR中的最小副本數是多少,預設值為1,當且僅當request.required.acks參數設定為-1時,此參數才生效。如果ISR中的副本數少于min.insync.replicas配置的數量時,用戶端會傳回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

接下來對acks=1和-1的兩種情況進行詳細分析:

1. request.required.acks=1

producer發送資料到leader,leader寫本地日志成功,傳回用戶端成功;此時ISR中的副本還沒有來得及拉取該消息,leader就當機了,那麼此次發送的消息就會丢失。(鐐瑰嚮鏀懼ぇ鍥懼儚)

2. request.required.acks=-1

同步(Kafka預設為同步,即producer.type=sync)的發送模式,replication.factor>=2且min.insync.replicas>=2的情況下,不會丢失資料。

有兩種典型情況。acks=-1的情況下(如無特殊說明,以下acks都表示為參數request.required.acks),資料發送到leader, ISR的follower全部完成資料同步後,leader此時挂掉,那麼會選舉出新的leader,資料不會丢失。

acks=-1的情況下,資料發送到leader後 ,部分ISR的副本同步,leader此時挂掉。比如follower1h和follower2都有可能變成新的leader, producer端會得到傳回異常,producer端會重新發送資料,資料可能會重複。

當然上圖中如果在leader crash的時候,follower2還沒有同步到任何資料,而且follower2被選舉為新的leader的話,這樣消息就不會重複。

注:Kafka隻處理fail/recover問題,不處理Byzantine問題。

3.5 關于HW的進一步探讨

考慮上圖(即acks=-1,部分ISR副本同步)中的另一種情況,如果在Leader挂掉的時候,follower1同步了消息4,5,follower2同步了消息4,與此同時follower2被選舉為leader,那麼此時follower1中的多出的消息5該做如何處理呢?

這裡就需要HW的協同配合了。如前所述,一個partition中的ISR清單中,leader的HW是所有ISR清單裡副本中最小的那個的LEO。類似于木桶原理,水位取決于最低那塊短闆。

如上圖,某個topic的某partition有三個副本,分别為A、B、C。A作為leader肯定是LEO最高,B緊随其後,C機器由于配置比較低,網絡比較差,故而同步最慢。這個時候A機器當機,這時候如果B成為leader,假如沒有HW,在A重新恢複之後會做同步(makeFollower)操作,在當機時log檔案之後直接做追加操作,而假如B的LEO已經達到了A的LEO,會産生資料不一緻的情況,是以使用HW來避免這種情況。A在做同步操作的時候,先将log檔案截斷到之前自己的HW的位置,即3,之後再從B中拉取消息進行同步。

如果失敗的follower恢複過來,它首先将自己的log檔案截斷到上次checkpointed時刻的HW的位置,之後再從leader中同步消息。leader挂掉會重新選舉,新的leader會發送“指令”讓其餘的follower截斷至自身的HW的位置然後再拉取新的消息。

當ISR中的個副本的LEO不一緻時,如果此時leader挂掉,選舉新的leader時并不是按照LEO的高低進行選舉,而是按照ISR中的順序選舉。

3.6 Leader選舉

一條消息隻有被ISR中的所有follower都從leader複制過去才會被認為已送出。這樣就避免了部分資料被寫進了leader,還沒來得及被任何follower複制就當機了,而造成資料丢失。而對于producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設定。這種機制確定了隻要ISR中有一個或者以上的follower,一條被commit的消息就不會丢失。

有一個很重要的問題是當leader當機了,怎樣在follower中選舉出新的leader,因為follower可能落後很多或者直接crash了,是以必須確定選擇“最新”的follower作為新的leader。一個基本的原則就是,如果leader不在了,新的leader必須擁有原來的leader commit的所有消息。這就需要做一個折中,如果leader在表名一個消息被commit前等待更多的follower确認,那麼在它挂掉之後就有更多的follower可以成為新的leader,但這也會造成吞吐率的下降。

一種非常常用的選舉leader的方式是“少數服從多數”,Kafka并不是采用這種方式。這種模式下,如果我們有2f+1個副本,那麼在commit之前必須保證有f+1個replica複制完消息,同時為了保證能正确選舉出新的leader,失敗的副本數不能超過f個。這種方式有個很大的優勢,系統的延遲取決于最快的幾台機器,也就是說比如副本數為3,那麼延遲就取決于最快的那個follower而不是最慢的那個。

“少數服從多數”的方式也有一些劣勢,為了保證leader選舉的正常進行,它所能容忍的失敗的follower數比較少,如果要容忍1個follower挂掉,那麼至少要3個以上的副本,如果要容忍2個follower挂掉,必須要有5個以上的副本。也就是說,在生産環境下為了保證較高的容錯率,必須要有大量的副本,而大量的副本又會在大資料量下導緻性能的急劇下降。這種算法更多用在Zookeeper這種共享叢集配置的系統中而很少在需要大量資料的系統中使用的原因。HDFS的HA功能也是基于“少數服從多數”的方式,但是其資料存儲并不是采用這樣的方式。

實際上,leader選舉的算法非常多,比如Zookeeper的Zab、Raft以及Viewstamped Replication。而Kafka所使用的leader選舉算法更像是微軟的PacificA算法。

Kafka在Zookeeper中為每一個partition動态的維護了一個ISR,這個ISR裡的所有replica都跟上了leader,隻有ISR裡的成員才能有被選為leader的可能(unclean.leader.election.enable=false)。在這種模式下,對于f+1個副本,一個Kafka topic能在保證不丢失已經commit消息的前提下容忍f個副本的失敗,在大多數使用場景下,這種模式是十分有利的。事實上,為了容忍f個副本的失敗,“少數服從多數”的方式和ISR在commit前需要等待的副本的數量是一樣的,但是ISR需要的總的副本的個數幾乎是“少數服從多數”的方式的一半。

上文提到,在ISR中至少有一個follower時,Kafka可以確定已經commit的資料不丢失,但如果某一個partition的所有replica都挂了,就無法保證資料不丢失了。這種情況下有兩種可行的方案:

  1. 等待ISR中任意一個replica“活”過來,并且選它作為leader
  2. 選擇第一個“活”過來的replica(并不一定是在ISR中)作為leader

這就需要在可用性和一緻性當中作出一個簡單的抉擇。如果一定要等待ISR中的replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中所有的replica都無法“活”過來了,或者資料丢失了,這個partition将永遠不可用。選擇第一個“活”過來的replica作為leader,而這個replica不是ISR中的replica,那即使它并不保障已經包含了所有已commit的消息,它也會成為leader而作為consumer的資料源。預設情況下,Kafka采用第二種政策,即unclean.leader.election.enable=true,也可以将此參數設定為false來啟用第一種政策。

unclean.leader.election.enable這個參數對于leader的選舉、系統的可用性以及資料的可靠性都有至關重要的影響。下面我們來分析下幾種典型的場景。

如果上圖所示,假設某個partition中的副本數為3,replica-0, replica-1, replica-2分别存放在broker0, broker1和broker2中。AR=(0,1,2),ISR=(0,1)。設定request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false。這裡講broker0中的副本也稱之為broker0起初broker0為leader,broker1為follower。

  • 當ISR中的replica-0出現crash的情況時,broker1選舉為新的leader[ISR=(1)],因為受min.insync.replicas=2影響,write不能服務,但是read能繼續正常服務。此種情況恢複方案:
    1. 嘗試恢複(重新開機)replica-0,如果能起來,系統正常;
    2. 如果replica-0不能恢複,需要将min.insync.replicas設定為1,恢複write功能。
  • 當ISR中的replica-0出現crash,緊接着replica-1也出現了crash, 此時[ISR=(1),leader=-1],不能對外提供服務,此種情況恢複方案:
    1. 嘗試恢複replica-0和replica-1,如果都能起來,則系統恢複正常;
    2. 如果replica-0起來,而replica-1不能起來,這時候仍然不能選出leader,因為當設定unclean.leader.election.enable=false時,leader隻能從ISR中選舉,當ISR中所有副本都失效之後,需要ISR中最後失效的那個副本能恢複之後才能選舉leader, 即replica-0先失效,replica-1後失效,需要replica-1恢複後才能選舉leader。保守的方案建議把unclean.leader.election.enable設定為true,但是這樣會有丢失資料的情況發生,這樣可以恢複read服務。同樣需要将min.insync.replicas設定為1,恢複write功能;
    3. replica-1恢複,replica-0不能恢複,這個情況上面遇到過,read服務可用,需要将min.insync.replicas設定為1,恢複write功能;
    4. replica-0和replica-1都不能恢複,這種情況可以參考情形2.
  • 當ISR中的replica-0, replica-1同時當機,此時[ISR=(0,1)],不能對外提供服務,此種情況恢複方案:嘗試恢複replica-0和replica-1,當其中任意一個副本恢複正常時,對外可以提供read服務。直到2個副本恢複正常,write功能才能恢複,或者将将min.insync.replicas設定為1。

3.7 Kafka的發送模式

Kafka的發送模式由producer端的配置參數producer.type來設定,這個參數指定了在背景線程中消息的發送方式是同步的還是異步的,預設是同步的方式,即producer.type=sync。如果設定成異步的模式,即producer.type=async,可以是producer以batch的形式push資料,這樣會極大的提高broker的性能,但是這樣會增加丢失資料的風險。如果需要確定消息的可靠性,必須要将producer.type設定為sync。

對于異步模式,還有4個配套的參數,如下:

以batch的方式推送資料可以極大的提高處理效率,kafka producer可以将消息在記憶體中累計到一定數量後作為一個batch發送請求。batch的數量大小可以通過producer的參數(batch.num.messages)控制。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設定需要在效率和時效性方面做一個權衡。在比較新的版本中還有batch.size這個參數。

4 高可靠性使用分析

4.1 消息傳輸保障

前面已經介紹了Kafka如何進行有效的存儲,以及了解了producer和consumer如何工作。接下來讨論的是Kafka如何確定消息在producer和consumer之間傳輸。有以下三種可能的傳輸保障(delivery guarantee):

  • At most once: 消息可能會丢,但絕不會重複傳輸
  • At least once:消息絕不會丢,但可能會重複傳輸
  • Exactly once:每條消息肯定會被傳輸一次且僅傳輸一次

Kafka的消息傳輸保障機制非常直覺。當producer向broker發送消息時,一旦這條消息被commit,由于副本機制(replication)的存在,它就不會丢失。但是如果producer發送資料給broker後,遇到的網絡問題而造成通信中斷,那producer就無法判斷該條消息是否已經送出(commit)。雖然Kafka無法确定網絡故障期間發生了什麼,但是producer可以retry多次,確定消息已經正确傳輸到broker中,是以目前Kafka實作的是at least once。

consumer從broker中讀取消息後,可以選擇commit,該操作會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同。當然也可以将consumer設定為autocommit,即consumer一旦讀取到資料立即自動commit。如果隻讨論這一讀取消息的過程,那Kafka是確定了exactly once, 但是如果由于前面producer與broker之間的某種原因導緻消息的重複,那麼這裡就是at least once。

考慮這樣一種情況,當consumer讀完消息之後先commit再處理消息,在這種模式下,如果consumer在commit後還沒來得及處理消息就crash了,下次重新開始工作後就無法讀到剛剛已送出而未處理的消息,這就對應于at most once了。

讀完消息先處理再commit。這種模式下,如果處理完了消息在commit之前consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了,這就對應于at least once。

要做到exactly once就需要引入消息去重機制。

4.2 消息去重

如上一節所述,Kafka在producer端和consumer端都會出現消息的重複,這就需要去重處理。

Kafka文檔中提及GUID(Globally Unique Identifier)的概念,通過用戶端生成算法得到每個消息的unique id,同時可映射至broker上存儲的位址,即通過GUID便可查詢提取消息内容,也便于發送方的幂等性保證,需要在broker上提供此去重處理子產品,目前版本尚不支援。

針對GUID, 如果從用戶端的角度去重,那麼需要引入集中式緩存,必然會增加依賴複雜度,另外緩存的大小難以界定。

不隻是Kafka, 類似RabbitMQ以及RocketMQ這類商業級中間件也隻保障at least once, 且也無法從自身去進行消息去重。是以我們建議業務方根據自身的業務特點進行去重,比如業務消息本身具備幂等性,或者借助Redis等其他産品進行去重處理。

4.3 高可靠性配置

Kafka提供了很高的資料備援彈性,對于需要資料高可靠性的場景,我們可以增加資料備援備份數(replication.factor),調高最小寫入副本數的個數(min.insync.replicas)等等,但是這樣會影響性能。反之,性能提高而可靠性則降低,使用者需要自身業務特性在彼此之間做一些權衡性選擇。

要保證資料寫入到Kafka是安全的,高可靠的,需要如下的配置:

  • topic的配置:replication.factor>=3,即副本數至少是3個;2<=min.insync.replicas<=replication.factor
  • broker的配置:leader的選舉條件unclean.leader.election.enable=false
  • producer的配置:request.required.acks=-1(all),producer.type=sync

5 BenchMark

Kafka在唯品會有着很深的曆史淵源,根據唯品會消息中間件團隊(VMS團隊)所掌握的資料顯示,在VMS團隊運轉的Kafka叢集中所支撐的topic數已接近2000,每天的請求量也已達百億級。這裡就以Kafka的高可靠性為基準點來探究幾種不同場景下的行為表現,以此來加深對Kafka的認知,為大家在以後高效的使用Kafka時提供一份依據。

5.1 測試環境

Kafka broker用到了4台機器,分别為broker[0/1/2/3]配置如下:

  • CPU: 24core/2.6GHZ
  • Memory: 62G
  • Network: 4000Mb
  • OS/kernel: CentOs release 6.6 (Final)
  • Disk: 1089G
  • Kafka版本:0.10.1.0

broker端JVM參數設定:-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

用戶端機器配置:

  • Memory: 3G
  • Network: 1000Mb
  • OS/kernel: CentOs release 6.3 (Final)
  • Disk: 240G

5.2 不同場景測試

場景1:測試不同的副本數、min.insync.replicas政策以及request.required.acks政策(以下簡稱acks政策)對于發送速度(TPS)的影響。

具體配置:一個producer;發送方式為sync;消息體大小為1kB;partition數為12。副本數為:1/2/4;min.insync.replicas分别為1/2/4;acks分别為-1(all)/1/0。

具體測試資料如下表(min.insync.replicas隻在acks=-1時有效):

測試結果分析:

  • 用戶端的acks政策對發送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1;
  • 副本數越高,TPS越低;副本數一緻時,min.insync.replicas不影響TPS;
  • acks=0/1時,TPS與min.insync.replicas參數以及副本數無關,僅受acks政策的影響。

下面将partition的個數設定為1,來進一步确認下不同的acks政策、不同的min.insync.replicas政策以及不同的副本數對于發送速度的影響,詳細請看情景2和情景3。

場景2:在partition個數固定為1,測試不同的副本數和min.insync.replicas政策對發送速度的影響。

具體配置:一個producer;發送方式為sync;消息體大小為1kB;producer端acks=-1(all)。變換副本數:2/3/4; min.insync.replicas設定為:1/2/4。

測試結果如下:

測試結果分析:副本數越高,TPS越低(這點與場景1的測試結論吻合),但是當partition數為1時差距甚微。min.insync.replicas不影響TPS。

場景3:在partition個數固定為1,測試不同的acks政策和副本數對發送速度的影響。

具體配置:一個producer;發送方式為sync;消息體大小為1kB;min.insync.replicas=1。topic副本數為:1/2/4;acks: 0/1/-1。

測試結果分析(與情景1一緻):

  • 副本數越多,TPS越低;
  • 用戶端的acks政策對發送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1。

場景4:測試不同partition數對發送速率的影響

具體配置:一個producer;消息體大小為1KB;發送方式為sync;topic副本數為2;min.insync.replicas=2;acks=-1。partition數量設定為1/2/4/8/12。

測試結果:

測試結果分析:partition的不同會影響TPS,随着partition的個數的增長TPS會有所增長,但并不是一直成正比關系,到達一定臨界值時,partition數量的增加反而會使TPS略微降低。

場景5:通過将叢集中部分broker設定成不可服務狀态,測試對用戶端以及消息落盤的影響。

具體配置:一個producer;消息體大小1KB;發送方式為sync;topic副本數為4;min.insync.replicas設定為2;acks=-1;retries=0/100000000;partition數為12。

具體測試資料如下表:

出錯資訊:

  • 錯誤1:用戶端傳回異常,部分資料可落盤,部分失敗:org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
  • 錯誤2:[WARN]internals.Sender - Got error produce response with correlation id 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION
  • 錯誤3: [WARN]internals.Sender - Got error produce response with correlation id 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left). Error: NOT_ENOUGH_REPLICAS
  • 錯誤4: [WARN]internals.Sender - Got error produce response with correlation id 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND
  • kill兩台broker後,用戶端可以繼續發送。broker減少後,partition的leader分布在剩餘的兩台broker上,造成了TPS的減小;
  • kill三台broker後,用戶端無法繼續發送。Kafka的自動重試功能開始起作用,當大于等于min.insync.replicas數量的broker恢複後,可以繼續發送;
  • 當retries不為0時,消息有重複落盤;用戶端成功傳回的消息都成功落盤,異常時部分消息可以落盤。

場景6:測試單個producer的發送延遲,以及端到端的延遲。

具體配置::一個producer;消息體大小1KB;發送方式為sync;topic副本數為4;min.insync.replicas設定為2;acks=-1;partition數為12。

測試資料及結果(機關為ms):

各場景測試總結:

  • 當acks=-1時,Kafka發送端的TPS受限于topic的副本數量(ISR中),副本越多TPS越低;
  • acks=0時,TPS最高,其次為1,最差為-1,即TPS:acks_0 > acks_1 > ack_-1;
  • min.insync.replicas參數不影響TPS;
  • partition的不同會影響TPS,随着partition的個數的增長TPS會有所增長,但并不是一直成正比關系,到達一定臨界值時,partition數量的增加反而會使TPS略微降低;
  • Kafka在acks=-1,min.insync.replicas>=1時,具有高可靠性,所有成功傳回的消息都可以落盤。