天天看點

Kafka:用于日志處理的分布式消息系統摘要常用術語關鍵詞1. 簡介2. 相關工作3. Kafka架構和設計原則4. Kafka在LinkedIn中的使用5. 實驗結果6. 結論與未來工作7. 參考文獻

文章目錄

  • 摘要
  • 常用術語
  • 關鍵詞
  • 1. 簡介
  • 2. 相關工作
  • 3. Kafka架構和設計原則
    • 3.1 單分區的效率
      • 3.1.1 簡單的存儲
      • 3.1.2 高效的傳輸
      • 3.1.3 無狀态代理
    • 3.2 分布式協調
    • 3.3 傳遞保證
  • 4. Kafka在LinkedIn中的使用
  • 5. 實驗結果
    • 5.1 生産者測試
    • 5.2 消費者測試
  • 6. 結論與未來工作
  • 7. 參考文獻

摘要

日志處理已成為消費者網際網路公司資料管道中的重要組成部分。我們介紹Kafka,這是一個分布式消息系統,我們開發該系統是為了以低延遲收集和發送大量日志資料。我們的系統結合了現有日志聚合器和消息傳遞系統的思想,同時适用于離線和線上消息消費使用。我們在Kafka中做出了許多非正常但實用的設計選擇,以使我們的系統高效且可擴充。我們的實驗結果表明,與兩種流行的消息傳遞系統相比,Kafka具有卓越的性能。我們已經在生産中使用Kafka有一段時間了,它每天要處理數百GB的新資料。

常用術語

管理,性能,設計,實驗。

關鍵詞

消息傳遞,分布式,日志處理,吞吐量,線上。

1. 簡介

在任何規模較大的網際網路公司中都會生成大量“日志”資料。這類資料通常包括(1) 與登入、浏覽量、點選、“喜歡”、分享、評論和搜尋查詢相對應的使用者活動事件;(2) 操作名額,例如服務調用堆棧、調用延遲、錯誤,以及系統名額,例如每台計算機上的CPU、記憶體、網絡或磁盤使用率。日志資料一直是用于跟蹤使用者參與度、系統使用率和其他名額的分析的組成部分。但是,網絡應用的最新趨勢已使生産資料管道的一部分的活動資料被直接用于站點功能。這些用途包括(1) 搜尋相關性,(2) 可能由活動流中的項目受歡迎程度或并發所驅動的推薦,(3) 廣告定位和報表,(4) 防止濫用行為(例如垃圾郵件或未經授權的資料抓取)的安全應用,以及(5) 新聞摘要功能,這些功能彙總了使用者狀态更新或以供其“朋友”或“連接配接”讀取的操作。

日志資料的這種實時生産使用給資料系統帶來了新的挑戰,因為其資料量比“實際”資料大幾個數量級。例如,搜尋、推薦和廣告通常需要計算細粒度的點選率,這不僅會為每個使用者點選生成日志記錄,還會為每個頁面上數十個未被點選的項目生成日志記錄。每天,中國移動收集5–8 TB的電話記錄[11],而Facebook收集近6 TB的各種使用者活動事件[12]。

許多用于處理此類資料的早期系統都依靠實體地将日志檔案從生産伺服器上抓取下來進行分析。近年來,已經建立了幾種專門的分布式日志聚合器,包括Facebook的Scribe[6],Yahoo的Data Highway[4]和Cloudera的Flume[3]。這些系統主要用于收集日志資料并将其加載到資料倉庫或Hadoop[8]中以供離線使用。在LinkedIn(一個社交網站),我們發現,除了傳統的離線分析之外,我們還需要以不超過幾秒鐘的時延來支援上述大多數實時應用。

我們已經建立了一種用于日志處理的新型消息傳遞系統,叫做Kafka[18],該系統結合了傳統日志聚合器和消息傳遞系統的優勢。一方面,Kafka是分布式且可擴充的,并提供高吞吐量。另一方面,Kafka提供類似于消息傳遞系統的API,并允許應用實時使用日志事件。Kafka已開源,并已在LinkedIn中成功用于生産中超過6個月。因為我們可以利用單個軟體來同時線上和離線使用所有類型的日志資料,是以它極大地簡化了我們的基礎架構。本文的其餘部分安排如下。我們将在第2節中回顧傳統的消息傳遞系統和日志聚合器。在第3節中,我們描述Kafka的體系架構及其關鍵設計原則。我們将在第4節中介紹在LinkedIn中Kafka的部署情況,并在第5節中介紹Kafka的性能結果。在第6節中,我們讨論未來的工作并得出結論。

2. 相關工作

傳統的企業消息傳遞系統[1][7][15][17]已經存在了很長時間,并且經常作為處理異步資料流的事件總線發揮關鍵作用。但是,由于某些原因,它們往往不适合日志處理。首先,企業系統提供的功能不比對。這些系統通常專注于提供豐富的傳遞保證。例如,IBM Websphere MQ[7]具有事務支援,允許應用原子地将消息插入多個隊列。JMS[14]規範允許在消費後确認每個單獨的消息(可能會導緻亂序)。這樣的傳遞保證對于收集日志資料通常是過份行為。例如,偶爾丢失一些綜合浏覽量事件肯定不是世界末日。這些不需要的功能往往會增加API和這些系統的基礎實作的複雜性。其次,許多系統沒有像其主要設計限制那樣專注于吞吐量。例如,JMS沒有API允許生産者将多個消息顯式批處理為單個請求。這意味着每個消息都需要一個完整的TCP/IP往返,這對于我們域的吞吐量要求是不可行的。第三,這些系統在分布式支援方面薄弱。沒有簡單的方法可以在多台計算機上分區和存儲消息。最後,許多消息發送系統假定消息将立即消費掉,是以未消費的消息隊列總是很小。如果允許累積消息,則它們的性能将大大降低,對于離線使用方(例如,資料倉庫應用,它們會定期進行大負載地而不是連續地使用),情況就是如此。

在過去的幾年中,已經建立了許多專業的日志聚合器。Facebook使用名為Scribe的系統。每個前端計算機都可以通過套接字将日志資料發送到一組Scribe機器。每台Scribe機器都會彙總日志條目,并定期将它們轉儲到HDFS[9]或NFS裝置。雅虎的資料高速公路項目具有類似的資料流。一組計算機聚集來自用戶端的事件并推出“分鐘”檔案,然後将其添加到HDFS。Flume是由Cloudera開發的相對較新的日志聚合器。它支援可擴充的“管道”和“接收器”,并使流式日志資料非常靈活。它還具有更多內建的分布式支援。但是,大多數這些系統是為離線使用日志資料而建構的,并且經常不必要地向消費者暴露實作細節(例如“分鐘檔案”)。此外,它們中的大多數使用“推送”模式,其代理将資料轉發給消費者。在LinkedIn,我們發現“拉取”模式更适合我們的應用,因為每個消費者都可以以其可以維持的最大速率檢索消息,并避免被以超出其處理能力的推送速度的消息淹沒。拉取模式還可以輕松地回退消費者,我們将在第3.2節的末尾讨論這種好處。

最近,Yahoo!Research開發了一種新的分布式釋出/訂閱系統,稱為HedWig[13]。HedWig具有高度的可擴充性和可用性,并提供強大的持久性保證。但是,它主要用于儲存資料存儲的送出日志。

3. Kafka架構和設計原則

由于現有系統的限制,我們開發了一種新的基于消息的日志聚合器Kafka。我們首先介紹Kafka中的基本概念。由主題定義的特定類型的消息流。生産者可以将消息釋出到主題。 然後,已釋出的消息将存儲在一組稱為代理的伺服器上。消費者可以訂閱來自代理的一個或多個主題,并通過從代理拉取資料來消費訂閱的消息。

消息從概念上講很簡單,我們試圖使Kafka API同樣簡單以反映這一點。我們沒有展示确切的API,而是提供了一些示例代碼來說明如何使用API。生産者的示例代碼如下。一條消息被定義為僅包含位元組的有效負載。使用者可以選擇她喜歡的序列化方法來編碼消息。為了提高效率,生産者可以在單個釋出請求中發送一組消息。

生産者代碼示例:

producer = new Producer(…);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);
           

為了訂閱某主題,消費者首先為該主題建立一個或多個消息流。釋出給該主題的消息将平均配置設定到這些子流中。有關Kafka如何分發消息的詳細資訊,将在第3.2節中介紹。每個消息流在正在生成的連續消息流上提供疊代器接口。然後,消費者周遊流中的每個消息并處理消息的有效負載。與傳統的疊代器不同,消息流疊代器永遠不會終止。如果目前沒有更多消息可消費,則疊代器将阻塞,直到有新消息釋出到該主題為止。我們既支援多個消費者共同使用一個主題中所有消息的單個副本的點對點傳遞模型,也支援多個消費者各自拉取其自己的主題副本的釋出/訂閱模型。

消費者代碼示例:

streams[] = Consumer.createMessageStreams(“topic1”, 1)
for (message : streams[0]) {
  bytes = message.payload();
  // do something with the bytes
}
           

Kafka的總體架構如圖1所示。由于Kafka實際上是分布式的,是以Kafka叢集通常由多個代理組成。為了平衡負載,一個主題分為多個分區,每個代理存儲一個或多個這些分區。多個生産者和消費者可以同時釋出和拉取消息。在3.1節中,我們描述了代理上單個分區的布局以及我們選擇的一些設計選擇,以使通路分區更加有效。在3.2節中,我們描述了生産者和消費者如何在分布式環境中與多個代理進行互動。我們将在第3.3節中讨論Kafka的傳遞保證。

Kafka:用于日志處理的分布式消息系統摘要常用術語關鍵詞1. 簡介2. 相關工作3. Kafka架構和設計原則4. Kafka在LinkedIn中的使用5. 實驗結果6. 結論與未來工作7. 參考文獻

3.1 單分區的效率

我們在Kafka中做出了一些選擇,以提高系統效率。

3.1.1 簡單的存儲

Kafka的存儲布局非常簡單。主題的每個分區都對應一個邏輯日志。在實體上,日志是由一組大小近似相同(例如1GB)的段檔案實作的。生産者每次将消息釋出到分區時,代理都将消息簡單地附加到最後一個段檔案。為了獲得更好的性能,我們僅在釋出了可配置數量的消息或經過一定時間後才将分段檔案重新整理到磁盤。消息僅在重新整理後才暴露給消費者。

與典型的消息系統不同,存儲在Kafka中的消息沒有明确的消息ID。相反,每個消息都通過其在日志中的邏輯偏移量來尋址。這樣避免了維護将消息ID映射到實際消息位置的輔助的、查找密集型随機通路索引結構的開銷。請注意,我們的消息ID正在增加,但不是連續的。要計算下一條消息的ID,我們必須将目前消息的長度添加到其ID中。從現在開始,我們将交替使用消息ID和偏移量。

消費者始終按順序使用來自特定分區的消息。如果消費者确認特定的消息偏移量,則表示消費者已經接收了在分區中的該偏移量之前的所有消息。在背景,消費者正在向代理發出異步請求,以使資料緩沖區可供應用消費。每個拉取請求都包含從中開始消費的消息的偏移量和要提取的可接受的位元組數。每個代理在記憶體中儲存一個偏移量的排序清單,包括每個段檔案中第一條消息的偏移量。代理通過搜尋偏移量清單來定位所請求消息所位于的段檔案,并将資料發送回消費者。消費者接收到一條消息後,它将計算下一條要消費的消息的偏移量,并在下一個拉取請求中使用它。Kafka日志和記憶體索引的布局如圖2所示。每個框顯示一條消息的偏移量。

Kafka:用于日志處理的分布式消息系統摘要常用術語關鍵詞1. 簡介2. 相關工作3. Kafka架構和設計原則4. Kafka在LinkedIn中的使用5. 實驗結果6. 結論與未來工作7. 參考文獻

3.1.2 高效的傳輸

我們非常關注将資料傳入和傳出Kafka。之前,我們已經展示了生産者可以在單個發送請求中送出一組消息。盡管最終消費者API一次疊代一條消息,但在背景,來自消費者的每個拉取請求還可以檢索到一定大小(通常為數百KB)的多條消息。

我們做出的另一個非正常選擇是避免在Kafka層上将消息顯式緩存在記憶體中。相反,我們依賴于底層檔案系統頁面緩存。這具有避免雙重緩沖的主要好處 - 消息僅被緩存在頁面高速緩存中。這樣做還有一個好處,即使重新啟動代理程序,也可以保留熱緩存。由于Kafka根本不緩存程序中的消息,是以在垃圾回收記憶體方面幾乎沒有開銷,進而可以使用基于VM的語言進行高效實作。最後,由于生産者和消費者都按順序通路段檔案,是以消費者經常落後于生産者一小部分,是以正常的作業系統緩存試探法非常有效(特别是直寫式緩存和預讀)。我們已經發現,生産和消耗都具有與資料大小成線性關系的一緻性能,最大可達數TB的資料。

此外,我們為消費者優化了網絡通路。Kafka是一個多使用者系統,單個消息可能會被不同的消費者應用多次使用。從本地檔案向遠端套接字發送位元組的典型方法包括以下步驟:(1) 從存儲媒體讀取資料到OS中的頁面緩存,(2) 将頁面緩存中的資料複制到應用程式緩沖區,(3) 将應用程式緩沖區複制到另一個核心緩沖區,(4) 從核心緩沖區發送到套接字。這包括4個資料複制和2個系統調用。在Linux和其他Unix作業系統上,存在一個sendfile API[5],它可以直接将位元組從檔案通道傳輸到套接字通道。這樣通常可以避免步驟2和步驟3中引入的2個複制和1個系統調用。Kafka利用sendfile API有效地将日志段檔案中的位元組從代理傳遞到消費者。

3.1.3 無狀态代理

與大多數其他消息傳遞系統不同,在Kafka中,有關每個消費者已消費多少的資訊不是由代理維護的,而是由消費者自己維護的。這樣的設計減少了代理的很多複雜性和開銷。但是,由于代理不知道是否所有訂閱者都已消費該消息,是以删除消息變得很棘手。Kafka通過将簡單的基于時間的SLA用于保留政策來解決此問題。如果消息在代理中的保留時間超過一定時間(通常為7天),則會自動删除該消息。該解決方案在實踐中效果很好。大多數消費者(包括離線消費者)每天、每小時或實時完成消費。Kafka的性能不會随着資料量的增加而降低,這一事實使得這種長期保留成為可能。

此設計有一個重要的附帶好處。消費者可以有意地回退到舊的偏移量并重新使用資料。這違反了隊列的通用協定,但是事實證明這是許多消費者的基本功能。例如,當消費者中的應用程式邏輯出現錯誤時,錯誤修複後,應用程式可以重播某些消息。這對于将ETL資料加載到我們的資料倉庫或Hadoop系統中特别重要。作為另一示例,所消費的資料可以僅周期性地被重新整理到持久性存儲(例如,全文本索引器)。如果消費者崩潰,則未重新整理的資料将丢失。在這種情況下,消費者可以将未重新整理消息的最小偏移量設定檢查點,并在重新啟動後從該偏移量中重新消費。我們注意到,在拉取模式中支援回退消費者要比推送模型容易得多。

3.2 分布式協調

現在我們描述生産者和消費者在分布式環境中的行為。每個生産者可以将消息釋出到随機選擇的分區或由分區鍵和分區函數語義确定的分區。我們将關注于消費者與代理的互動方式。

Kafka具有消費者組的概念。每個消費者組由一個或多個共同消費一組訂閱主題的消費者組成,即,每個消息僅傳遞給該組中的一個消費者。不同的消費者組各自獨立地消費整個訂閱消息集,并且不需要跨消費者組進行協調。同一組中的消費者可以處于不同的程序中,也可以位于不同的機器上。我們的目标是在消費者之間平均配置設定存儲在代理中的消息,而不會引入過多的協調開銷。

我們的第一個決定是使主題内的分區成為并行度的最小機關。這意味着在任何給定時間,來自一個分區的所有消息僅由每個消費者組中的單個消費者消費。如果我們允許多個消費者同時消費一個分區,那麼他們将必須協調誰消費哪些消息,這需要加鎖和狀态維護開銷。相反,在我們的設計中,消費程序僅在消費者重新平衡負載時才需要協調(這種情況很少發生)。為了使負載真正達到平衡,我們在一個主題中需要的分區要比每組中的消費者多得多。我們可以通過對主題進行過度分區來輕松實作這一目标。

我們做出的第二個決定是沒有中央的“主”節點,而是讓消費者以分散的方式在彼此之間進行協調。增加主伺服器會使系統複雜化,因為我們必須進一步考慮主伺服器故障。為了促進協調,我們使用了高度可用的一緻性服務Zookeeper[10]。Zookeeper有一個非常簡單的類檔案系統API。可以建立路徑、設定路徑的值、讀取路徑的值、删除路徑并列出路徑的子路徑。它做了一些更有趣的事情:(a) 可以在路徑上注冊觀察者,并在路徑的子路徑或路徑的值發生更改時得到通知;(b) 可以臨時建立一個路徑(與持久性相反),這意味着如果建立的用戶端不存在了,則該路徑将由Zookeeper伺服器自動删除;© Zookeeper将其資料複制到多個伺服器,這使資料高度可靠且可用。

Kafka使用Zookeeper執行以下任務:(1) 檢測代理和消費者的添加和删除,(2) 在上述事件發生時,在每個消費者中觸發重新平衡過程,以及(3) 維護消費關系并跟蹤每個分區的消費的偏移量。具體來說,每個代理或消費者啟動時,會将其資訊存儲在Zookeeper中的代理或消費者的系統資料庫中。代理系統資料庫包含代理的主機名和端口,以及存儲在其中的主題和分區集合。消費者系統資料庫包括消費者所屬的消費者組及其訂閱的主題集。每個消費者組都與Zookeeper中的所有權系統資料庫和偏移量系統資料庫關聯。所有權系統資料庫對每個訂閱的分區都有一個路徑,路徑值是目前從該分區消費的消費者的ID(我們使用的消費者擁有該分區的術語)。偏移量系統資料庫為每個訂閱的分區存儲該分區中最後消費的消息的偏移量。

在Zookeeper中建立的路徑對于代理系統資料庫、消費者系統資料庫和所有權系統資料庫是臨時的,對于偏移量系統資料庫是持久的。如果代理失敗,則該代理上的所有分區都會自動從代理系統資料庫中删除。消費者的故障導緻它丢失在消費者系統資料庫中的條目以及在所有者系統資料庫中擁有的所有分區。每個消費者都在代理系統資料庫和消費者系統資料庫上都注冊了Zookeeper觀察者,并且每當代理集合或消費者組發生更改時,都将收到通知。

在消費者的初始啟動過程中,或者通過觀察者通知消費者有關代理/消費者更改的通知時,消費者将啟動重新平衡過程以确定應從中消費的新分區子集。算法1中描述了該過程。通過從Zookeeper中讀取代理和消費者系統資料庫,消費者首先計算可用于每個已訂閱主題T的分區集(PT)和訂閱T的消費者集(CT)。然後它将PT範圍分區為|CT|塊并确定地選擇擁有其中一個塊。對于消費者選擇的每個分區,它都會在所有權系統資料庫中将自己寫為該分區的新所有者。最後,消費者啟動一個線程從每個擁有的分區中拉取資料(從偏移系統資料庫中存儲的偏移處開始)。随着從分區中拉取消息,消費者将定期更新偏移量系統資料庫中的最新的消費偏移量。

Algorithm 1: rebalance process for consumer Ci in group G
For each topic T that Ci subscribes to {
  remove partitions owned by Ci from the ownership registry
  read the broker and the consumer registries from Zookeeper
  compute PT = partitions available in all brokers under topic T
  compute CT = all consumers in G that subscribe to topic T
  sort PT and CT
  let j be the index position of Ci in CT and let N = |PT|/|CT|
  assign partitions from j*N to (j+1)*N - 1 in PT to consumer Ci
  for each assigned partition p {
    set the owner of p to Ci in the ownership registry
    let Op = the offset of partition p stored in the offset registry
    invoke a thread to pull data in partition p from offset Op
  }
}
           

當一個組中有多個消費者時,将通知每個代理或消費者變更。但是,在消費者處通知的到達時間可能略有不同。是以,一個消費者有可能嘗試獲得仍由另一個消費者擁有的分區的所有權。發生這種情況時,第一個消費者隻需釋放其目前擁有的所有分區,稍等片刻,然後重試重新平衡過程。實際上,重新平衡過程通常隻需重試幾次即可穩定下來。

建立新的消費者組時,偏移量系統資料庫中沒有可用的偏移量。在這種情況下,使用我們在代理上提供的API,消費者将從每個訂閱分區上可用的最小或最大偏移量(取決于配置)開始。

3.3 傳遞保證

通常,Kafka僅保證至少一次傳遞。恰好一次傳遞通常需要兩階段送出,而對于我們的應用不是必需的。在大多數情況下,一條消息恰好一次發送給每個消費者組。但是,如果消費者程序崩潰而沒有完全關閉,則接管有故障的消費者擁有的那些分區的消費者程序可能會得到一些重複的消息,這些消息是在最後一次偏移量成功送出給Zookeeper之後的。如果應用關心重複,則它必須使用我們傳回給使用者的偏移量或消息中的某些唯一鍵來添加自己的重複資料删除邏輯。與使用兩階段送出相比,這通常是一種更經濟有效的方法。

Kafka保證将來自單個分區的消息按順序傳遞給消費者。但是,不能保證來自不同分區的消息的順序。

為了避免日志損壞,Kafka将每個消息的CRC存儲在日志中。如果代理上存在任何I/O錯誤,Kafka将運作恢複過程以删除帶有不一緻CRC的消息。在消息級别使用CRC還可以使我們在産生或消費消息之後檢查網絡錯誤。

如果代理發生故障,則存儲在其上尚未消費的任何消息将變得不可用。如果代理上的存儲系統被永久損壞,則所有未使用的消息将永遠丢失。将來,我們計劃在Kafka中添加内置複制,以将每個消息備援地存儲在多個代理上。

4. Kafka在LinkedIn中的使用

在本節中,我們描述了如何在LinkedIn使用Kafka。圖3顯示了我們部署的簡化版本。我們有一個Kafka叢集與運作我們的使用者界面服務的每個資料中心共置一處。前端服務生成各種日志資料,并将其批量釋出到本地Kafka代理。我們依靠硬體負載平衡器将釋出請求平均配置設定給Kafka代理集。Kafka的線上消費者在同一資料中心内的服務中運作。

Kafka:用于日志處理的分布式消息系統摘要常用術語關鍵詞1. 簡介2. 相關工作3. Kafka架構和設計原則4. Kafka在LinkedIn中的使用5. 實驗結果6. 結論與未來工作7. 參考文獻

我們還在單獨的資料中心中部署了一個Kafka叢集,以進行離線分析,該叢集的地理位置靠近我們的Hadoop叢集和其他資料倉庫基礎設施。該Kafka執行個體運作一組嵌入式消費者,以從實時資料中心的Kafka執行個體中拉取資料。然後,我們運作資料加載作業,以将資料從Kafka的副本叢集中拉取到Hadoop和我們的資料倉庫,在此我們對資料運作各種報表作業和分析過程。我們還使用此Kafka叢集進行原型設計,并能夠針對原始事件流運作簡單腳本以進行特定查詢。無需太多調整,整個管道的端到端延遲平均約為10秒,足以滿足我們的要求。

目前,Kafka每天存儲數百GB的資料和近10億條消息,随着我們完成對舊系統的切換以利用Kafka的優勢,我們預計這一數量将顯著增長。将來會添加更多類型的消息。當操作人員啟動或停止代理進行軟體或硬體維護時,重新平衡過程能夠自動重定向消費。

我們的跟蹤還包括一個稽核系統,以驗證整個管道中沒有資料丢失。為友善起見,每條消息在生成時均帶有時間戳和伺服器名稱。我們對每個生産者進行監測,使其定期生成監視事件,該事件記錄該生産者在固定時間視窗内針對每個主題釋出的消息數。生産者在單獨的主題中将監視事件釋出到Kafka。然後,消費者可以計算他們從給定主題中收到的消息數,并使用監視事件來驗證這些計數以驗證資料的正确性。

加載到Hadoop叢集中是通過實作一種特殊的Kafka輸入格式來完成的,該格式允許MapReduce作業直接從Kafka讀取資料。MapReduce作業将加載原始資料,然後對其進行分組和壓縮,以在将來進行有效處理。消息偏移的無狀态代理和用戶端存儲再次在這裡發揮作用,允許MapReduce任務管理(允許任務失敗并重新啟動)以自然方式處理資料加載,而不會在任務重新啟動時重複或丢失消息。僅在成功完成作業時,資料和偏移量才被存儲到HDFS中。

我們選擇使用Avro[2]作為序列化協定,因為它高效且支援模式演變。對于每條消息,我們将其Avro模式的ID和序列化的位元組存儲在有效負載中。這種模式使我們可以強制協定確定資料生産者和消費者之間的相容性。我們使用輕量級的模式系統資料庫服務将模式ID映射到實際模式。消費者收到消息時,它将在模式系統資料庫中進行查詢以檢索模式,該模式用于将位元組解碼為對象(由于值是不可變的,是以每個模式隻需要執行一次該查詢)。

5. 實驗結果

我們進行了一項實驗研究,将Kafka與Apache ActiveMQ v5.4[1](一種流行的JMS開源實作)和RabbitMQ v2.4[16](一種以其性能著稱的消息系統)的性能進行了比較。我們使用了ActiveMQ的預設持久消息存儲KahaDB。盡管此處未介紹,但我們還測試了替代的AMQ消息存儲,發現其性能與KahaDB非常相似。隻要可能,我們都會嘗試在所有系統中使用可比較的設定。

我們在2台Linux機器上進行了實驗,每台機器具有8個2GHz核、16GB記憶體、6個RAID 10磁盤。這兩台機器通過1Gb網絡鍊路連接配接。其中一台機器用作代理,另一台機器用作生産者或消費者。

5.1 生産者測試

我們在所有系統中将代理配置為異步将消息重新整理到其持久性存儲。對于每個系統,我們運作一個生産者來釋出總共1000萬條消息,每條消息200個位元組。我們配置了Kafka生産者以1和50的大小批量發送消息。ActiveMQ和RabbitMQ似乎沒有簡單的方法來批量發送消息,并且我們假定它使用的批量大小為1。結果如圖4所示。x軸表示随時間推移發送給代理的資料量(以MB為機關),y軸表示生産者吞吐量(以每秒消息數為機關)。平均而言,Kafka可以以每秒50000和400000條消息的速度釋出消息,對應的批量大小分别為1和50。這些數字比ActiveMQ高幾個數量級,比RabbitMQ高至少2倍。

Kafka:用于日志處理的分布式消息系統摘要常用術語關鍵詞1. 簡介2. 相關工作3. Kafka架構和設計原則4. Kafka在LinkedIn中的使用5. 實驗結果6. 結論與未來工作7. 參考文獻

Kafka表現出色很多的原因有幾個。首先,Kafka生産者目前不等待代理的确認,而是以代理可以處理的盡可能快的速度發送消息。這大大提高了釋出者的吞吐量。當批量大小為50時,單個Kafka生産者幾乎飽和了生産者和代理之間的1Gb鍊路。對于日志聚合情況,這是一種有效的優化,因為必須異步發送資料,以避免将任何延時引入流量的實時服務中。我們注意到,在生産者沒有确認的情況下,不能保證代理實際上收到了每個已釋出的消息。對于許多類型的日志資料,希望以持久性換取吞吐量,隻要所丢棄消息的數量相對較小即可。但是,我們确實計劃在将來解決持久性問題,以擷取更多關鍵資料。

其次,Kafka具有更有效的存儲格式。平均而言,每條消息在Kafka中的開銷為9位元組,而在ActiveMQ中為144位元組。這意味着ActiveMQ使用的存儲空間比Kafka多70%,用于存儲相同的1000萬條消息。ActiveMQ的一項開銷來自JMS所需的繁重消息頭。另一個開銷是維護各種索引結構的成本。我們觀察到ActiveMQ中最繁忙的線程之一花費了大部分時間來通路B樹以維護消息中繼資料和狀态。最後,批處理通過平攤RPC開銷大大提高了吞吐量。在Kafka中,每批50條消息将吞吐量提高了近一個數量級。

5.2 消費者測試

在第二個實驗中,我們測試了消費者的性能。同樣,對于所有系統,我們僅使用一個消費者就可以檢索總共1000萬條消息。我們配置了所有系統,以便每個拉取請求會預拉取大約相同數量的資料 - 最多1000條消息或大約200KB。我們将消費者确認模式設定為自動。由于所有消息都在記憶體中,是以所有系統都從底層檔案系統的頁面緩存或某些記憶體緩沖區中提供資料。結果如圖5所示。

Kafka:用于日志處理的分布式消息系統摘要常用術語關鍵詞1. 簡介2. 相關工作3. Kafka架構和設計原則4. Kafka在LinkedIn中的使用5. 實驗結果6. 結論與未來工作7. 參考文獻

平均而言,Kafka每秒消費22000條消息,是ActiveMQ和RabbitMQ的4倍以上。我們可以想到幾個原因。首先,由于Kafka具有更有效的存儲格式,是以從代理向Kafka中的消費者傳輸的位元組數更少。其次,ActiveMQ和RabbitMQ中的代理都必須維護每個消息的發送狀态。我們觀察到,在此測試期間,ActiveMQ線程之一正在忙于将KahaDB頁面寫入磁盤。相反,Kafka的代理上沒有磁盤寫入活動。最後,通過使用sendfile API,Kafka減少了傳輸開銷。

在結束本節時,我們注意到該實驗的目的并不是要表明其他消息傳遞系統不如Kafka。畢竟,ActiveMQ和RabbitMQ都具有比Kafka更多的功能。重點是說明通過專用系統可以實作的潛在性能提升。

6. 結論與未來工作

我們提出了一種稱為Kafka的新穎系統,用于處理大量日志資料流。像消息傳遞系統一樣,Kafka使用基于拉取的消費模式,該模式允許應用以自己的速率消費資料并在需要時回退該消費。通過專注于日志處理應用,Kafka實作了比正常消息傳遞系統更高的吞吐量。它還提供內建的分布式支援,并且可以擴充。我們已經在LinkedIn成功地将Kafka用于離線和線上應用。

我們将來會探求許多方向。首先,我們計劃在多個代理之間添加消息的内置複制,進而即使在無法恢複的機器故障的情況下,也可以保證持久性和資料可用性。我們希望同時支援異步和同步複制模型,以在生産者延遲和提供的保證強度之間進行權衡。應用可以根據其對持久性、可用性和吞吐量的要求來選擇合适的備援級别。其次,我們要在Kafka中添加一些流處理功能。從Kafka檢索消息後,實時應用通常會執行類似的操作,例如基于視窗的統計以及将每個消息與輔助存儲中的記錄或另一個流中的消息關聯在一起。在底層,這可以通過在釋出過程中對消息按聯接鍵進行語義分區來支援,以便使用特定鍵發送的所有消息都到達相同的分區,進而到達單個消費者程序。這為處理消費者機器叢集中的分布式流提供了基礎。最重要的是,我們認為有用的流實用工具庫(例如不同的視窗功能或連接配接技術)将對此類應用有益。

7. 參考文獻

  1. http://activemq.apache.org/
  2. http://avro.apache.org/
  3. Cloudera’s Flume, https://github.com/cloudera/flume
  4. http://developer.yahoo.com/blogs/hadoop/posts/2010/06/enabling_hadoop_batch_processi_1/
  5. Efficient data transfer through zero copy: https://www.ibm.com/developerworks/linux/library/jzerocopy/
  6. Facebook’s Scribe, http://www.facebook.com/note.php?note_id=32008268919
  7. IBM Websphere MQ: http://www-01.ibm.com/software/integration/wmq/
  8. http://hadoop.apache.org/
  9. http://hadoop.apache.org/hdfs/
  10. http://hadoop.apache.org/zookeeper/
  11. http://www.slideshare.net/cloudera/hw09-hadoop-baseddata-mining-platform-for-the-telecom-industry
  12. http://www.slideshare.net/prasadc/hive-percona-2009
  13. https://issues.apache.org/jira/browse/ZOOKEEPER-775
  14. JAVA Message Service: http://download.oracle.com/javaee/1.3/jms/tutorial/1_3_1-fcs/doc/jms_tutorialTOC.html.
  15. Oracle Enterprise Messaging Service: http://www.oracle.com/technetwork/middleware/ias/index-093455.html
  16. http://www.rabbitmq.com/
  17. TIBCO Enterprise Message Service: http://www.tibco.com/products/soa/messaging/
  18. Kafka, http://sna-projects.com/kafka/