天天看點

kafka高性能吞吐原因

1. 簡單回顧

  Kafka作為時下最流行的開源消息系統,被廣泛地應用在資料緩沖、異步通信、彙集日志、系統解耦等方面。相比較于RocketMQ等其他常見消息系統,Kafka在保障了大部分功能特性的同時,還提供了超一流的讀寫性能。

  本文将針對Kafka性能方面進行簡單分析,首先簡單介紹一下Kafka的架構和涉及到的名詞:

1.Topic:用于劃分Message的邏輯概念,一個Topic可以分布在多個Broker上。

2.Partition:是Kafka中橫向擴充和一切并行化的基礎,每個Topic都至少被切分為1個Partition。

3.Offset:消息在Partition中的編号,編号順序不跨Partition。

4.Consumer:用于從Broker中取出/消費Message。

5.Producer:用于往Broker中發送/生産Message。

6.Replication:Kafka支援以Partition為機關對Message進行備援備份,每個Partition都可以配置至少1個Replication(當僅1個Replication時即僅該Partition本身)。

7.Leader:每個Replication集合中的Partition都會選出一個唯一的Leader,所有的讀寫請求都由Leader處理。其他Replicas從Leader處把資料更新同步到本地,過程類似大家熟悉的MySQL中的Binlog同步。

8.Broker:Kafka中使用Broker來接受Producer和Consumer的請求,并把Message持久化到本地磁盤。每個Cluster當中會選舉出一個Broker來擔任Controller,負責處理Partition的Leader選舉,協調Partition遷移等工作。

9.ISR(In-Sync Replica):是Replicas的一個子集,表示目前Alive且與Leader能夠“Catch-up”的Replicas集合。由于讀寫都是首先落到Leader上,是以一般來說通過同步機制從Leader上拉取資料的Replica都會和Leader有一些延遲(包括了延遲時間和延遲條數兩個次元),任意一個超過門檻值都會把該Replica踢出ISR。每個Partition都有它自己獨立的ISR。

  以上幾乎是我們在使用Kafka的過程中可能遇到的所有名詞,同時也無一不是最核心的概念或元件,感覺到從設計本身來說,Kafka還是足夠簡潔的。這次本文圍繞Kafka優異的吞吐性能,逐個介紹一下其設計與實作當中所使用的各項“黑科技”。

2. Broker

  不同于Redis和MemcacheQ等記憶體消息隊列,Kafka的設計是把所有的Message都要寫入速度低容量大的硬碟,以此來換取更強的存儲能力。實際上,Kafka使用硬碟并沒有帶來過多的性能損失,“規規矩矩”的抄了一條“近道”。

  首先,說“規規矩矩”是因為Kafka在磁盤上隻做Sequence I/O,由于消息系統讀寫的特殊性,這并不存在什麼問題。關于磁盤I/O的性能,引用一組Kafka官方給出的測試資料(Raid-5,7200rpm):

Sequence I/O: 600MB/s
Random I/O: 100KB/s      

  是以通過隻做Sequence I/O的限制,規避了磁盤通路速度低下對性能可能造成的影響。

  接下來我們再聊一聊Kafka是如何“抄近道的”。

  首先,Kafka重度依賴底層作業系統提供的PageCache功能。當上層有寫操作時,作業系統隻是将資料寫入PageCache,同時标記Page屬性為Dirty。當讀操作發生時,先從PageCache中查找,如果發生缺頁才進行磁盤排程,最終傳回需要的資料。實際上PageCache是把盡可能多的空閑記憶體都當做了磁盤緩存來使用。同時如果有其他程序申請記憶體,回收PageCache的代價又很小,是以現代的OS都支援PageCache。

  使用PageCache功能同時可以避免在JVM内部緩存資料,JVM為我們提供了強大的GC能力,同時也引入了一些問題不适用與Kafka的設計。

  • 如果在Heap内管理緩存,JVM的GC線程會頻繁掃描Heap空間,帶來不必要的開銷。如果Heap過大,執行一次Full GC對系統的可用性來說将是極大的挑戰。
  • 所有在在JVM内的對象都不免帶有一個Object Overhead(千萬不可小視),記憶體的有效空間使用率會是以降低。
  • 所有的In-Process Cache在OS中都有一份同樣的PageCache。是以通過将緩存隻放在PageCache,可以至少讓可用緩存空間翻倍。
  • 如果Kafka重新開機,所有的In-Process Cache都會失效,而OS管理的PageCache依然可以繼續使用。

  PageCache還隻是第一步,Kafka為了進一步的優化性能還采用了Sendfile技術。在解釋Sendfile之前,首先介紹一下傳統的網絡I/O操作流程,大體上分為以下4步。

1.OS 從硬碟把資料讀到核心區的PageCache。

2.使用者程序把資料從核心區Copy到使用者區。

3.然後使用者程序再把資料寫入到Socket,資料流入核心區的Socket Buffer上。

4.OS 再把資料從Buffer中Copy到網卡的Buffer上,這樣完成一次發送。

kafka高性能吞吐原因

  整個過程共經曆兩次Context Switch,四次System Call。同一份資料在核心Buffer與使用者Buffer之間重複拷貝,效率低下。其中2、3兩步沒有必要,完全可以直接在核心區完成資料拷貝。這也正是Sendfile所解決的問題,經過Sendfile優化後,整個I/O過程就變成了下面這個樣子。

kafka高性能吞吐原因

  通過以上的介紹不難看出,Kafka的設計初衷是盡一切努力在記憶體中完成資料交換,無論是對外作為一整個消息系統,或是内部同底層作業系統的互動。如果Producer和Consumer之間生産和消費進度上配合得當,完全可以實作資料交換零I/O。這也就是我為什麼說Kafka使用“硬碟”并沒有帶來過多性能損失的原因。下面是我在生産環境中采到的一些名額。

  (20 Brokers, 75 Partitions per Broker, 110k msg/s)

kafka高性能吞吐原因

  此時的叢集隻有寫,沒有讀操作。10M/s左右的Send的流量是Partition之間進行Replicate而産生的。從recv和writ的速率比較可以看出,寫盤是使用Asynchronous+Batch的方式,底層OS可能還會進行磁盤寫順序優化。而在有Read Request進來的時候分為兩種情況,第一種是記憶體中完成資料交換。

kafka高性能吞吐原因

  Send流量從平均10M/s增加到了到平均60M/s,而磁盤Read隻有不超過50KB/s。PageCache降低磁盤I/O效果非常明顯。

  接下來是讀一些收到了一段時間,已經從記憶體中被換出刷寫到磁盤上的老資料。

kafka高性能吞吐原因

其他名額還是老樣子,而磁盤Read已經飚高到40+MB/s。此時全部的資料都已經是走硬碟了(對硬碟的順序讀取OS層會進行Prefill PageCache的優化)。依然沒有任何性能問題。

  Tips

1.Kafka官方并不建議通過Broker端的log.flush.interval.messages和log.flush.interval.ms來強制寫盤,認為資料的可靠性應該通過Replica來保證,而強制Flush資料到磁盤會對整體性能産生影響。

2.可以通過調整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio來調優性能。

3.髒頁率超過第一個名額會啟動pdflush開始Flush Dirty PageCache。

4.髒頁率超過第二個名額會阻塞所有的寫操作來進行Flush。

5.根據不同的業務需求可以适當的降低dirty_background_ratio和提高dirty_ratio。

3. Partition

  Partition是Kafka可以很好的橫向擴充和提供高并發處理以及實作Replication的基礎。

  擴充性方面。首先,Kafka允許Partition在叢集内的Broker之間任意移動,以此來均衡可能存在的資料傾斜問題。其次,Partition支援自定義的分區算法,例如可以将同一個Key的所有消息都路由到同一個Partition上去。 同時Leader也可以在In-Sync的Replica中遷移。由于針對某一個Partition的所有讀寫請求都是隻由Leader來處理,是以Kafka會盡量把Leader均勻的分散到叢集的各個節點上,以免造成網絡流量過于集中。

  并發方面。任意Partition在某一個時刻隻能被一個Consumer Group内的一個Consumer消費(反過來一個Consumer則可以同時消費多個Partition),Kafka非常簡潔的Offset機制最小化了Broker和Consumer之間的互動,這使Kafka并不會像同類其他消息隊列一樣,随着下遊Consumer數目的增加而成比例的降低性能。此外,如果多個Consumer恰巧都是消費時間序上很相近的資料,可以達到很高的PageCache命中率,因而Kafka可以非常高效的支援高并發讀操作,實踐中基本可以達到單機網卡上限。

  不過,Partition的數量并不是越多越好,Partition的數量越多,平均到每一個Broker上的數量也就越多。考慮到Broker當機(Network Failure, Full GC)的情況下,需要由Controller來為所有當機的Broker上的所有Partition重新選舉Leader,假設每個Partition的選舉消耗10ms,如果Broker上有500個Partition,那麼在進行選舉的5s的時間裡,對上述Partition的讀寫操作都會觸發LeaderNotAvailableException。

  再進一步,如果挂掉的Broker是整個叢集的Controller,那麼首先要進行的是重新任命一個Broker作為Controller。新任命的Controller要從Zookeeper上擷取所有Partition的Meta資訊,擷取每個資訊大概3-5ms,那麼如果有10000個Partition這個時間就會達到30s-50s。而且不要忘記這隻是重新啟動一個Controller花費的時間,在這基礎上還要再加上前面說的選舉Leader的時間 -_-!!!!!!

  此外,在Broker端,對Producer和Consumer都使用了Buffer機制。其中Buffer的大小是統一配置的,數量則與Partition個數相同。如果Partition個數過多,會導緻Producer和Consumer的Buffer記憶體占用過大。

  Tips

1.Partition的數量盡量提前預配置設定,雖然可以在後期動态增加Partition,但是會冒着可能破壞Message Key和Partition之間對應關系的風險。

2.Replica的數量不要過多,如果條件允許盡量把Replica集合内的Partition分别調整到不同的Rack。

3.盡一切努力保證每次停Broker時都可以Clean Shutdown,否則問題就不僅僅是恢複服務所需時間長,還可能出現資料損壞或其他很詭異的問題。

4. Producer

  Kafka的研發團隊表示在0.8版本裡用Java重寫了整個Producer,據說性能有了很大提升。我還沒有親自對比試用過,這裡就不做資料對比了。本文結尾的擴充閱讀裡提到了一套我認為比較好的對照組,有興趣的同學可以嘗試一下。

  其實在Producer端的優化大部分消息系統采取的方式都比較單一,無非也就化零為整、同步變異步這麼幾種。

  Kafka系統預設支援MessageSet,把多條Message自動地打成一個Group後發送出去,均攤後拉低了每次通信的RTT。而且在組織MessageSet的同時,還可以把資料重新排序,從爆發流式的随機寫入優化成較為平穩的線性寫入。

  此外,還要着重介紹的一點是,Producer支援End-to-End的壓縮。資料在本地壓縮後放到網絡上傳輸,在Broker一般不解壓(除非指定要Deep-Iteration),直至消息被Consume之後在用戶端解壓。

  當然使用者也可以選擇自己在應用層上做壓縮和解壓的工作(畢竟Kafka目前支援的壓縮算法有限,隻有GZIP和Snappy),不過這樣做反而會意外的降低效率!!!! Kafka的End-to-End壓縮與MessageSet配合在一起工作效果最佳,上面的做法直接割裂了兩者間聯系。至于道理其實很簡單,壓縮算法中一條基本的原理“重複的資料量越多,壓縮比越高”。無關于消息體的内容,無關于消息體的數量,大多數情況下輸入資料量大一些會取得更好的壓縮比。

  不過Kafka采用MessageSet也導緻在可用性上一定程度的妥協。每次發送資料時,Producer都是send()之後就認為已經發送出去了,但其實大多數情況下消息還在記憶體的MessageSet當中,尚未發送到網絡,這時候如果Producer挂掉,那就會出現丢資料的情況。

  為了解決這個問題,Kafka在0.8版本的設計借鑒了網絡當中的ack機制。如果對性能要求較高,又能在一定程度上允許Message的丢失,那就可以設定request.required.acks=0 來關閉ack,以全速發送。如果需要對發送的消息進行确認,就需要設定request.required.acks為1或-1,那麼1和-1又有什麼差別呢?這裡又要提到前面聊的有關Replica數量問題。如果配置為1,表示消息隻需要被Leader接收并确認即可,其他的Replica可以進行異步拉取無需立即進行确認,在保證可靠性的同時又不會把效率拉得很低。如果設定為-1,表示消息要Commit到該Partition的ISR集合中的所有Replica後,才可以傳回ack,消息的發送會更安全,而整個過程的延遲會随着Replica的數量正比增長,這裡就需要根據不同的需求做相應的優化。

  Tips

1.Producer的線程不要配置過多,尤其是在Mirror或者Migration中使用的時候,會加劇目标叢集Partition消息亂序的情況(如果你的應用場景對消息順序很敏感的話)。

2.0.8版本的request.required.acks預設是0(同0.7)。

5. Consumer

  Consumer端的設計大體上還算是比較正常的。

• 通過Consumer Group,可以支援生産者消費者和隊列通路兩種模式。

• Consumer API分為High level和Low level兩種。前一種重度依賴Zookeeper,是以性能差一些且不自由,但是超省心。第二種不依賴Zookeeper服務,無論從自由度和性能上都有更好的表現,但是所有的異常(Leader遷移、Offset越界、Broker當機等)和Offset的維護都需要自行處理。

• 大家可以關注下不日釋出的0.9 Release。開發人員又用Java重寫了一套Consumer。把兩套API合并在一起,同時去掉了對Zookeeper的依賴。據說性能有大幅度提升哦~~

  Tips

  強烈推薦使用Low level API,雖然繁瑣一些,但是目前隻有這個API可以對Error資料進行自定義處理,尤其是處理Broker異常或由于Unclean Shutdown導緻的Corrupted Data時,否則無法Skip隻能等着“壞消息”在Broker上被Rotate掉,在此期間該Replica将會一直處于不可用狀态。

擴充閱讀:

Sendfile: ​​https://www.ibm.com/developerworks/cn/java/j-zerocopy/​​

So what’s wrong with 1975 programming: ​​https://www.varnish-cache.org/trac/wiki/ArchitectNotes​​

Benchmarking: ​​https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines​​

作者:小家電維修

轉世燕還故榻,為你銜來二月的花。