天天看點

[Kafka設計解析]--(六)Kafka高性能架構之道

本文轉發自​​技術世界​​​

摘要

上一篇文章《​​Kafka設計解析(五)- Kafka性能測試方法及Benchmark報告​​》從測試角度說明了Kafka的性能。本文從宏觀架構層面和具體實作層面分析了Kafka如何實作高性能。

宏觀架構層面

利用Partition實作并行處理

Partition提供并行處理的能力

Kafka是一個Pub-Sub的消息系統,無論是釋出還是訂閱,都須指定Topic。如《​​Kafka設計解析(一)- Kafka背景及架構介紹​​》一文所述,Topic隻是一個邏輯的概念。每個Topic都包含一個或多個Partition,不同Partition可位于不同節點。同時Partition在實體上對應一個本地檔案夾,每個Partition包含一個或多個Segment,每個Segment包含一個資料檔案和一個與之對應的索引檔案。在邏輯上,可以把一個Partition當作一個非常長的數組,可通過這個“數組”的索引(offset)去通路其資料。

一方面,由于不同Partition可位于不同機器,是以可以充分利用叢集優勢,實作機器間的并行處理。另一方面,由于Partition在實體上對應一個檔案夾,即使多個Partition位于同一個節點,也可通過配置讓同一節點上的不同Partition置于不同的disk drive上,進而實作磁盤間的并行處理,充分發揮多磁盤的優勢。

利用多磁盤的具體方法是,将不同磁盤mount到不同目錄,然後在server.properties中,将​

​log.dirs​

​設定為多目錄(用逗号分隔)。Kafka會自動将所有Partition盡可能均勻配置設定到不同目錄也即不同目錄(也即不同disk)上。

注:雖然實體上最小機關是Segment,但Kafka并不提供同一Partition内不同Segment間的并行處理。因為對于寫而言,每次隻會寫Partition内的一個Segment,而對于讀而言,也隻會順序讀取同一Partition内的不同Segment。

Partition是最小并發粒度

如同《​​Kafka設計解析(四)- Kafka Consumer設計解析​​》一文所述,多Consumer消費同一個Topic時,同一條消息隻會被同一Consumer Group内的一個Consumer所消費。而資料并非按消息為機關配置設定,而是以Partition為機關配置設定,也即同一個Partition的資料隻會被一個Consumer所消費(在不考慮Rebalance的前提下)。

如果Consumer的個數多于Partition的個數,那麼會有部分Consumer無法消費該Topic的任何資料,也即當Consumer個數超過Partition後,增加Consumer并不能增加并行度。

簡而言之,Partition個數決定了可能的最大并行度。如下圖所示,由于Topic 2隻包含3個Partition,故group2中的Consumer 3、Consumer 4、Consumer 5 可分别消費1個Partition的資料,而Consumer 6消費不到Topic 2的任何資料。

​​

[Kafka設計解析]--(六)Kafka高性能架構之道

​​

以Spark消費Kafka資料為例,如果所消費的Topic的Partition數為N,則有效的Spark最大并行度也為N。即使将Spark的Executor數設定為N+M,最多也隻有N個Executor可同時處理該Topic的資料。

ISR實作可用性與資料一緻性的動态平衡

CAP理論

CAP理論是指,分布式系統中,一緻性、可用性和分區容忍性最多隻能同時滿足兩個。

一緻性

  • 通過某個節點的寫操作結果對後面通過其它節點的讀操作可見
  • 如果更新資料後,并發通路情況下後續讀操作可立即感覺該更新,稱為強一緻性
  • 如果允許之後部分或者全部感覺不到該更新,稱為弱一緻性
  • 若在之後的一段時間(通常該時間不固定)後,一定可以感覺到該更新,稱為最終一緻性

可用性

  • 任何一個沒有發生故障的節點必須在有限的時間内傳回合理的結果

分區容忍性

  • 部分節點當機或者無法與其它節點通信時,各分區間還可保持分布式系統的功能

一般而言,都要求保證分區容忍性。是以在CAP理論下,更多的是需要在可用性和一緻性之間做權衡。

常用資料複制及一緻性方案

Master-Slave

  • RDBMS的讀寫分離即為典型的Master-Slave方案
  • 同步複制可保證強一緻性但會影響可用性
  • 異步複制可提供高可用性但會降低一緻性

WNR

  • 主要用于去中心化的分布式系統中。DynamoDB與Cassandra即采用此方案或其變種
  • N代表總副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少要讀取的副本數
  • 當W+R>N時,可保證每次讀取的資料至少有一個副本擁有最新的資料
  • 多個寫操作的順序難以保證,可能導緻多副本間的寫操作順序不一緻。Dynamo通過向量時鐘保證最終一緻性

Paxos及其變種

  • Google的Chubby,Zookeeper的原子廣播協定(Zab),RAFT等

基于ISR的資料複制方案

如《​​​ Kafka High Availability(上)​​》一文所述,Kafka的資料複制是以Partition為機關的。而多個備份間的資料複制,通過Follower向Leader拉取資料完成。從一這點來講,Kafka的資料複制方案接近于上文所講的Master-Slave方案。不同的是,Kafka既不是完全的同步複制,也不是完全的異步複制,而是基于ISR的動态複制方案。

ISR,也即In-sync Replica。每個Partition的Leader都會維護這樣一個清單,該清單中,包含了所有與之同步的Replica(包含Leader自己)。每次資料寫入時,隻有ISR中的所有Replica都複制完,Leader才會将其置為Commit,它才能被Consumer所消費。

這種方案,與同步複制非常接近。但不同的是,這個ISR是由Leader動态維護的。如果Follower不能緊“跟上”Leader,它将被Leader從ISR中移除,待它又重新“跟上”Leader後,會被Leader再次加加ISR中。每次改變ISR後,Leader都會将最新的ISR持久化到Zookeeper中。

至于如何判斷某個Follower是否“跟上”Leader,不同版本的Kafka的政策稍微有些差別。

  • 對于0.8.*版本,如果Follower在

​replica.lag.time.max.ms​

  • 時間内未向Leader發送Fetch請求(也即資料複制請求),則Leader會将其從ISR中移除。如果某Follower持續向Leader發送Fetch請求,但是它與Leader的資料差距在

​replica.lag.max.messages​

  • 以上,也會被Leader從ISR中移除。
  • 從0.9.0.0版本開始,

​replica.lag.max.messages​

  • 被移除,故Leader不再考慮Follower落後的消息條數。另外,Leader不僅會判斷Follower是否在

​replica.lag.time.max.ms​

  • 時間内向其發送Fetch請求,同時還會考慮Follower是否在該時間内與之保持同步。
  • 0.10.* 版本的政策與0.9.*版一緻

對于0.8.*版本的​

​replica.lag.max.messages​

​​參數,很多讀者曾留言提問,既然隻有ISR中的所有Replica複制完後的消息才被認為Commit,那為何會出現Follower與Leader差距過大的情況。原因在于,Leader并不需要等到前一條消息被Commit才接收後一條消息。事實上,Leader可以按順序接收大量消息,最新的一條消息的Offset被記為High Wartermark。而隻有被ISR中所有Follower都複制過去的消息才會被Commit,Consumer隻能消費被Commit的消息。由于Follower的複制是嚴格按順序的,是以被Commit的消息之前的消息肯定也已經被Commit過。換句話說,High Watermark标記的是Leader所儲存的最新消息的offset,而Commit Offset标記的是最新的可被消費的(已同步到ISR中的Follower)消息。而Leader對資料的接收與Follower對資料的複制是異步進行的,是以會出現Commit Offset與High Watermark存在一定差距的情況。0.8.*版本中​

​replica.lag.max.messages​

​限定了Leader允許的該差距的最大值。

Kafka基于ISR的資料複制方案原理如下圖所示。

​​

[Kafka設計解析]--(六)Kafka高性能架構之道

​​

如上圖所示,在第一步中,Leader A總共收到3條消息,故其high watermark為3,但由于ISR中的Follower隻同步了第1條消息(m1),故隻有m1被Commit,也即隻有m1可被Consumer消費。此時Follower B與Leader A的差距是1,而Follower C與Leader A的差距是2,均未超過預設的​

​replica.lag.max.messages​

​​,故得以保留在ISR中。在第二步中,由于舊的Leader A當機,新的Leader B在​

​replica.lag.time.max.ms​

​時間内未收到來自A的Fetch請求,故将A從ISR中移除,此時ISR={B,C}。同時,由于此時新的Leader B中隻有2條消息,并未包含m3(m3從未被任何Leader所Commit),是以m3無法被Consumer消費。第四步中,Follower A恢複正常,它先将當機前未Commit的所有消息全部删除,然後從最後Commit過的消息的下一條消息開始追趕新的Leader B,直到它“趕上”新的Leader,才被重新加入新的ISR中。

使用ISR方案的原因

  • 由于Leader可移除不能及時與之同步的Follower,故與同步複制相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。
  • ISR中的所有Follower都包含了所有Commit過的消息,而隻有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處于同步狀态,進而與異步複制方案相比提高了資料一緻性。
  • ISR可動态調整,極限情況下,可以隻包含Leader,極大提高了可容忍的當機的Follower的數量。與

​Majority Quorum​

  • 方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。

ISR相關配置說明

  • Broker的

​min.insync.replicas​

  • 參數指定了Broker所要求的ISR最小長度,預設值為1。也即極限情況下ISR可以隻包含Leader。但此時如果Leader當機,則該Partition不可用,可用性得不到保證。
  • 隻有被ISR中所有Replica同步的消息才被Commit,但Producer釋出資料時,Leader并不需要ISR中的所有Replica同步該資料才确認收到資料。Producer可以通過

​acks​

  • 參數指定最少需要多少個Replica确認收到該消息才視為該消息發送成功。

​acks​

  • 的預設值是1,即Leader收到該消息後立即告訴Producer收到該消息,此時如果在ISR中的消息複制完該消息前Leader當機,那該條消息會丢失。而如果将該值設定為0,則Producer發送完資料後,立即認為該資料發送成功,不作任何等待,而實際上該資料可能發送失敗,并且Producer的Retry機制将不生效。更推薦的做法是,将

​acks​

  • 設定為

​all​

  • 或者

​-1​

  • ,此時隻有ISR中的所有Replica都收到該資料(也即該消息被Commit),Leader才會告訴Producer該消息發送成功,進而保證不會有未知的資料丢失。

具體實作層面

高效使用磁盤

順序寫磁盤

根據《​​一些場景下順序寫磁盤快于随機寫記憶體​​》所述,将寫磁盤的過程變為順序寫,可極大提高對磁盤的使用率。

Kafka的整個設計中,Partition相當于一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些資料,并且不删除已經消費的資料,進而避免了随機寫磁盤的過程。

由于磁盤有限,不可能儲存所有資料,實際上作為消息系統Kafka也沒必要儲存所有資料,需要删除舊的資料。而這個删除過程,并非通過使用“讀-寫”模式去修改檔案,而是将Partition分為多個Segment,每個Segment對應一個實體檔案,通過删除整個檔案的方式去删除Partition内的資料。這種方式清除舊資料的方式,也避免了對檔案的随機寫操作。

通過如下代碼可知,Kafka删除Segment的方式,是直接删除Segment對應的整個log檔案和整個index檔案而非删除檔案中的部分内容。

/**
 * Delete this log segment from the filesystem.
 *
 * @throws KafkaStorageException if the delete fails.
 */
def delete() {
  val deletedLog = log.delete()
  val deletedIndex = index.delete()
  val deletedTimeIndex = timeIndex.delete()
  if(!deletedLog && log.file.exists)
    throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
  if(!deletedIndex && index.file.exists)
    throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
  if(!deletedTimeIndex && timeIndex.file.exists)
    throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
}      

充分利用Page Cache

使用Page Cache的好處如下

  • I/O Scheduler會将連續的小塊寫組裝成大塊的實體寫進而提高性能
  • I/O Scheduler會嘗試将一些寫操作重新按順序排好,進而減少磁盤頭的移動時間
  • 充分利用所有空閑記憶體(非JVM記憶體)。如果使用應用層Cache(即JVM堆記憶體),會增加GC負擔
  • 讀操作可直接在Page Cache内進行。如果消費和生産速度相當,甚至不需要通過實體磁盤(直接通過Page Cache)交換資料
  • 如果程序重新開機,JVM内的Cache會失效,但Page Cache仍然可用

Broker收到資料後,寫磁盤時隻是将資料寫入Page Cache,并不保證資料一定完全寫入磁盤。從這一點看,可能會造成機器當機時,Page Cache内的資料未寫入磁盤進而造成資料丢失。但是這種丢失隻發生在機器斷電等造成作業系統不工作的場景,而這種場景完全可以由Kafka層面的Replication機制去解決。如果為了保證這種情況下資料不丢失而強制将Page Cache中的資料Flush到磁盤,反而會降低性能。也正因如此,Kafka雖然提供了​

​flush.messages​

​​和​

​flush.ms​

​兩個參數将Page Cache中的資料強制Flush到磁盤,但是Kafka并不建議使用。

如果資料消費速度與生産速度相當,甚至不需要通過實體磁盤交換資料,而是直接通過Page Cache交換資料。同時,Follower從Leader Fetch資料時,也可通過Page Cache完成。下圖為某Partition的Leader節點的網絡/磁盤讀寫資訊。

​​

[Kafka設計解析]--(六)Kafka高性能架構之道

​​

從上圖可以看到,該Broker每秒通過網絡從Producer接收約35MB資料,雖然有Follower從該Broker Fetch資料,但是該Broker基本無讀磁盤。這是因為該Broker直接從Page Cache中将資料取出傳回給了Follower。

支援多Disk Drive

Broker的​

​log.dirs​

​​配置項,允許配置多個檔案夾。如果機器上有多個Disk Drive,可将不同的Disk挂載到不同的目錄,然後将這些目錄都配置到​

​log.dirs​

​裡。Kafka會盡可能将不同的Partition配置設定到不同的目錄,也即不同的Disk上,進而充分利用了多Disk的優勢。

零拷貝

Kafka中存在大量的網絡資料持久化到磁盤(Producer到Broker)和磁盤檔案通過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。

傳統模式下的四次拷貝與四次上下文切換

以将磁盤檔案通過網絡發送為例。傳統模式下,一般使用如下僞代碼所示的方法先将檔案資料讀入記憶體,然後通過Socket将記憶體中的資料發送出去。

buffer = File.read
Socket.send(buffer)      

這一過程實際上發生了四次資料拷貝。首先通過系統調用将檔案資料讀入到核心态Buffer(DMA拷貝),然後應用程式将記憶體态Buffer資料讀入到使用者态Buffer(CPU拷貝),接着使用者程式通過Socket發送資料時将使用者态Buffer資料拷貝到核心态Buffer(CPU拷貝),最後通過DMA拷貝将資料拷貝到NIC Buffer。同時,還伴随着四次上下文切換,如下圖所示。

​​

[Kafka設計解析]--(六)Kafka高性能架構之道

​​

sendfile和transferTo實作零拷貝

Linux 2.4+核心通過​

​sendfile​

​​系統調用,提供了零拷貝。資料通過DMA拷貝到核心态Buffer後,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少資料拷貝外,因為整個讀檔案-網絡發送由一個​

​sendfile​

​調用完成,整個過程隻有兩次上下文切換,是以大大提高了性能。零拷貝過程如下圖所示。

​​

[Kafka設計解析]--(六)Kafka高性能架構之道

​​

從具體實作來看,Kafka的資料傳輸通過TransportLayer來完成,其子類​

​PlaintextTransportLayer​

​​通過​​​Java NIO​​​的FileChannel的​

​transferTo​

​​和​

​transferFrom​

​方法實作零拷貝,如下所示。

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}      

注: ​

​transferTo​

​​和​

​transferFrom​

​​并不保證一定能使用零拷貝。實際上是否能使用零拷貝與作業系統相關,如果作業系統提供​

​sendfile​

​這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則并不能通過這兩個方法本身實作零拷貝。

減少網絡開銷

批處理

批處理是一種常用的用于提高I/O性能的方式。對Kafka而言,批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。

Kafka 0.8.1及以前的Producer區分同步Producer和異步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個KeyedMessage作為參數,一次發送一條消息。另一種是接受一批KeyedMessage作為參數,一次性發送多條消息。而對于異步發送而言,無論是使用哪個send方法,實作上都不會立即将消息發送給Broker,而是先存到内部的隊列中,直到消息條數達到門檻值或者達到指定的Timeout才真正的将消息發送出去,進而實作了消息的批量發送。

Kafka 0.8.2開始支援新的Producer API,将同步Producer和異步Producer結合。雖然從send接口來看,一次隻能發送一個ProducerRecord,而不能像之前版本的send方法一樣接受消息清單,但是send方法并非立即将消息發送出去,而是通過​

​batch.size​

​​和​

​linger.ms​

​控制實際發送頻率,進而實作批量發送。

由于每次網絡傳輸,除了傳輸消息本身以外,還要傳輸非常多的網絡協定本身的一些内容(稱為Overhead),是以将多條消息合并到一起傳輸,可有效減少網絡傳輸的Overhead,進而提高了傳輸效率。

從​​零拷貝章節的圖​​中可以看到,雖然Broker持續從網絡接收資料,但是寫磁盤并非每秒都在發生,而是間隔一段時間寫一次磁盤,并且每次寫磁盤的資料量都非常大(最高達到718MB/S)。

資料壓縮降低網絡負載

Kafka從0.7開始,即支援将資料壓縮後再傳輸給Broker。除了可以将每條消息單獨壓縮然後傳輸外,Kafka還支援在批量發送時,将整個Batch的消息一起壓縮後傳輸。資料壓縮的一個基本原理是,重複資料越多壓縮效果越好。是以将整個Batch的資料一起壓縮能更大幅度減小資料量,進而更大程度提高網絡傳輸效率。

高效的序列化方式