天天看點

【譯】使用Apache Kafka建構流式資料平台(1)何為流式資料平台?

這篇指南讨論我們關于實時資料流的工程經驗:如何在你的公司内部搭建實時資料平台、如何使用這些資料建構應用程式,所有這些都是基于實際經驗——我們在linkdin花了五年時間建構apache kafka,将linkdin轉換為流式資料架構,并幫助矽谷的很多技術公司完成了同樣的工作。

這份指南的第一部分是關于流式資料平台(steam data platform)的概覽:什麼是流式資料平台,為什麼要建構流式資料平台;第二部分将深入細節,給出一些操作規範和最佳實踐。

流式資料平台:簡潔、輕量的事件處理

我們在linkein建構apache kafka的目的是讓它作為資料流的中央倉庫工作,但是為什麼要做這個工作,有下面兩個原因:

資料整合:資料如何在各個系統之間流轉和傳輸;

流式處理:通常在資料倉庫或者hadoop叢集中需要做豐富的資料分析,同時實作低延時。

接下來介紹下上述兩個理論的提出過程。起初我們并沒有意識到這些問題之間有聯系,我們采取了臨時方案:隻要需要,就在系統和應用程式之間建造資料通道或者給web服務發送異步請求。随着時間推移,系統越來越複雜,我們在幾乎所有子系統之間都建立了不同的資料通道:

【譯】使用Apache Kafka建構流式資料平台(1)何為流式資料平台?

data-flow-ugly.png

每個資料通道都有自己的問題:日志資料的規模很大但是資料有缺失,并且資料傳輸的延遲很高;oracle資料庫執行個體之間的資料傳輸速度快、準确而且實時性好,但是其他系統不能及時快速得獲得這些資料;oracle資料庫的資料到hadoop叢集的資料通道吞吐量很高,但是隻能進行批次操作;搜尋系統資料通道的延遲低,不過資料規模小,并且是直接連接配接資料庫;消息系統資料通道的延遲低,但是不可靠且規模小。

随着我們在全球各地添加資料中心,我們也要為這些資料流添加對應的副本;随着系統規模的增長,對應的資料通道規模也應該相應得增長,整個系統面臨的壓力越來越大。我認為我的團隊與其說是由分布式系統工程師組成,還不如說是由一些管道工組成。

更糟的是,複雜性過高導緻資料不可靠。由于資料的索引和存儲存在問題,導緻我們的報告可信度降低。員工需要花費大量時間處理各種類型的髒資料,記得有在處理一起故障中,我們在兩個系統中發現一些非常類似但存在微小差異的資料,我們費了很大力氣檢查這兩個資料哪個是争取額的,最後發現兩個都不對。

與此同時,我們除了要做資料遷移,還想對資料進行進一步的處理和分析。hadoop平台提供了批處理、資料打包和專案(ad hoc)處理能力,但是我們還需要一個實時性更好的資料處理平台。我們的很多系統——特别是監控系統、搜尋索引的資料通道、資料分析應用以及安全分析應用,都需要秒級的響應速度,但是這類型的應用在上圖的系統架構中表現很差。

2010年左右,我們開始建構一個系統:專注于實時擷取流式資料(stream data),并規定各個系統之間的資料互動機制也以流式資料為承載,同時還允許對這些流式資料進行實時處理。這就是apache kafka的原型。

我們對整個系統的構想如下所示:

【譯】使用Apache Kafka建構流式資料平台(1)何為流式資料平台?

stream_data_platform.png

很長一段時間内我們都沒有為我們所建構的這個系統取名字,僅僅稱之為“kafka stuff”或者“global commit log thingy”,随着時間推移,我們開始将這個系統中的資料稱之為流式資料(steam data),而負責處理這種類型的資料的平台稱之為流式資料平台(steam data platform)。

最終我們的系統從前文描述的跟“意大利面條”一樣雜亂進化為清晰的以流式資料平台為中心的系統:

【譯】使用Apache Kafka建構流式資料平台(1)何為流式資料平台?

a modern strea-centric data architecture built around apache kafka

在這個系統中kafka的角色是通用資料管道。每個子系統都可以很容易得接入到這個中央資料管道上;流式處理應用可以接入到該資料管道上,并對外提供經過處理後的流式資料。這種固定格式的資料類型成為各個子系統、應用和資料中心之間的通用語言。舉個例子說明:如果一個使用者更新了他的個人資訊,這個更新資訊會流入我們的系統處理層,在系統處理層會對該使用者的公司資訊、地理位置和其他屬性進行标準化處理;然後這個資料流會流入搜尋引擎和社群地圖用于查詢和檢索、這個資料也會流入推薦系統進行工作比對;所有的這些動作隻需要毫秒量級的時間,最後這些資料會流入hadoop資料倉庫。

linkedin内部在大量使用這套系統,每天為數百個資料中心處理超過5000億事件請求,該系統已經成為其他系統的資料背景、成為hadoop叢集的資料管道,以及流式處理的hub。

接下來我們将論述流式資料平台的一些細節:該平台的工作原理、該平台解決了什麼重要問題。

這種概念對于習慣于将資料想象為資料庫中的一行的同學可能有點陌生,接下來我們看一點關于事件流資料的實際例子。

資料庫中存放的是資料的目前狀态,目前狀态是過去的某些動作(action)的結果,這些動作就是事件。庫存表儲存購買和交易事件産生的結果,銀行結餘存放信貸和借記事件的結果;web server的延時圖是一系列http請求的聚合。

當談論大資料時,很多人更青睐于記錄上述提到的這些事件流,并在此基礎上進行分析、優化和決策。某種層度上來說,這些事件流是傳統的資料庫沒有反應出來的一面:它們表示業務邏輯。

事件流資料在金融行業已經廣泛使用:股票發行、市場預測、股票交易等資料都可以當作是事件流,但是技術屆使得搜集和使用這些資料的現代技術開始流行。google将廣告點選流和廣告效果轉化為幾十億美金的收入。在web開發屆,這些事件資料又被稱為日志資料,由于缺乏針對日志處理的子產品,這些日志事件就存放在日志檔案中。hadoop之類的系統經常用于日志處理,但是根據實際情況,稱之為“批量事件存儲和處理(batch event storage and processing)”更合适。

網絡公司應該是最早開始記錄事件流的公司,搜集網站上的事件資料非常容易:在某些特定節點加一些代碼即可記錄和跟蹤每個使用者在改網站上的行為。即使是一個單頁面或者是某個流行網站上的移動視窗也能記錄很多類似的行為資料用于分析和監控。

你可能聽說過“機器産生的資料”這個概念,其實跟事件資料表示相同的含義。某種程度上所有的資料都是機器産生的,因為這些資料來自計算機系統。

事件流資料很适合描述日志資料或諸如訂單、交易、點選和貿易這些具備明顯事件特征的資料。和大多數開發人員相同,你可能将自己系統的大部分資料儲存在各種資料庫中:關系型資料庫(oracle、mysql和postgres)或者新興的分布式資料庫(mongodb、cassandra和couchbase),這些資料可能不容易了解為事件或者事件流。

但實際上,資料庫中存儲的資料也可了解為一種事件流(event steam),簡單來說,資料庫可以了解為建立資料備份或者建立備庫的過程。做資料備份的主要方法是周期性得導出資料庫内容,然後将這些資料導入到備庫中。如果我很少進行資料備份,或者是我的資料量不大,那麼可以進行全量備份。實際上,随着備份頻率的提高,全量備份不再可行:如果兩天做一次全量備份,将會耗費兩倍的系統資源、如果每個小時做一次全量備份,則會耗費24倍的系統資源。在大規模資料的備份中,顯然增量備份更加有效:隻增加新建立的、更新的資料和删除對應的資料。利用增量備份,如過我們将備份頻率提高為原來的1倍,則每次備份的數量将減少幾乎一半,消耗的系統資源也差不多。

那麼為什麼我們不盡可能提高增量備份的頻率呢?我們可以做到,但是最後隻會得到一系列單行資料改變的記錄——這種事件流稱之為變更記錄,很多資料庫系統都有負責這個工作的子產品(oracle資料庫系統中的xstreams和goldengate、mysql有binlog replication、postgres有logical log steaming replication)。

綜上,資料庫的變更過程也可以作為事件流的一部分。你可以通過這些事件流同步hadoop叢集、同步備庫或者搜尋索引;你還可以将這些事件流接入到特定的應用或者流式處理應用中,進而發掘或者分析出新的結論。

流式資料平台有兩個主要應用:

資料整合:流式資料平台搜集事件流或者資料變更資訊,并将這些變更輸送到其他資料系統,例如關系型資料庫、key-value存儲系統、hadoop或者其他資料倉庫。

流式處理:對流式資料進行持續、實時的處理和轉化,并将結果在整個系統内開放。

在角色1中,流式資料平台就像資料流的中央集線器。與之互動的應用程式不需要考慮資料源的細節,所有的資料流都以同一種資料格式表示;流式資料平台還可以作為其他子系統之間的緩沖區(buffer)——資料的提供者不需要關心最終消費和處理這些資料的其他系統。這意味着資料的消費者與資料源可以完全解耦合。

如果你需要部署一個新的系統,你隻需要将新系統接入到流式資料平台,而不需要為每個特定的需求選擇(并管理)各自的資料庫和應用程式。不論資料最初來自日志檔案、資料庫、hadoop叢集或者流式處理系統,這些資料流都使用相同的格式。在流式資料平台上部署新系統非常容易,新系統隻需要跟流式資料平台互動,而不需要跟各種具體的資料源互動。

hadoop叢集的設計目标是管理公司的全量資料,直接從hdfs中擷取資料是非常耗費時間的方案,而且直接擷取的資料不能直接用于實時處理和同步。但是,這個問題可以反過來看:hadoop等資料倉庫可以主動将結果以流式資料的格式推送給其他子系統中。

流式資料平台的角色2包含資料聚合用例,系統搜集各類資料形成資料流,然後存入hadoop叢集歸檔,這個過程就是一個持續的流式資料處理。流式處理的輸出還是資料流,同樣可以加載到其他資料系統中。

流式處理可以使用通過簡單的應用代碼實作,這些處理代碼處理事件流并産生新的事件流,這類工作可以通過一些流行的流式處理架構完成——storm、samza或spark streaming,這些架構提供了豐富的api接口。這些架構發展得都不錯,同時它們跟apache kafka的互動都很好。

在上文中我提到了一些不同的用例,每個用例都有對應的事件流,但是每個事件流的需求又有所不同——有些事件流要求快速響應、有些事件流要求高吞吐量、有些事件流要求可擴充性等等。如果我們想讓一個平台滿足這些不同的需求,這個平台應該提供什麼能力?

我認為對于一個流式資料平台,應該滿足下列關鍵需求:

它必須足夠可靠,以便于處理嚴苛的更新,例如将某個資料庫的更新日志變更為搜尋索引的存儲,能夠順序傳輸資料并保證不丢失資料;

它必須具備足夠大的吞吐量,用于處理大規模日志或者事件資料;

它必須具備緩沖或者持久化資料的能力,用于與hadoop這類批處理系統互動。

它必須能夠為實時處理程式實時提供資料,即延時要足夠低;

它必須具備良好的擴充性,可以應付整個公司的滿負載運作,并能夠內建成百上千個不同團隊的應用程式,這些應用以插件的形式與流式資料平台整合。

它必須能和實時處理架構良好得互動

流式資料平台是整個公司的核心系統,用于管理各種類型的資料流,如果該系統不能提供良好的可靠性以及可擴充性,系統會随着資料量的增長而再次遭遇瓶頸;如果該系統不支援批處理和實時處理,那麼就不能與hadoop或者storm這類系統整合。

apache kafka是專門處理流式資料的分布式系統,它具備良好的容錯性、高吞吐量、支援橫向擴充,并允許地理位置分布的流式資料處理。

kafka常常被歸類于消息處理系統,它确實扮演了類似的角色,但同時也提供了其他的抽象接口。在kafka中最關鍵的抽象資料結構是用于記錄更新的commit log:

【譯】使用Apache Kafka建構流式資料平台(1)何為流式資料平台?

commit_log-copy.png

資料生産者向commit log隊列中發送記錄流,其他消費者可以像水流一樣在毫秒級延時處理這些日志的最新資訊。每個資料消費者在commit log中有一個自己的位置(指針),并獨立移動,這使得可靠、順序更新能夠分布式得發送給每個消費者。

這個commit log的作用非常關鍵:可以多個生産者和消費者共享,并覆寫一個叢集中的多台機器,每台機器都可用作容錯保障;可以提供一個并行模型,其具備的順序消費的特點使得kafka可以用于記錄資料庫的變更。

kafka是一個現代的分布式系統,存儲在一個叢集的資料(副本和分片存儲)可以水準擴張和縮小,同時上層應用對此毫無感覺。資料消費者的機器數量可以随資料規模的增長而水準增加,同時可以自動應對資料處理過程中發生的錯誤。

kafka的一個關鍵設計是對持久化的處理相當好,kafka的消息代理(broker)可以存儲tb量級的資料,這使得kafka能夠完成一些傳統資料庫無法勝任的任務:

接入kafka的hadoop叢集或者其他離線系統可以放心得停機維護,間隔幾小時或者幾天後再平滑接入,因為在它停機期間到達的流式資料被存儲在kafka的上行叢集。

在首次執行同步資料庫的任務時可以執行全量備份,以便讓下行消費者通路全量資料。

上述這些特性使得kafka能夠提供比傳統的消息系統更廣的應用範圍。

自從我們将kafka開源後,我們有很多機會與其他想做類似的事情的公司交流和合作:研究如何kafka系統的部署以及kafka在該公司内部技術架構的角色如何随着時間演進和改變。

初次部署常常用于單個的大規模應用:日志資料處理,并接入hadoop叢集;也可能是其他資料流,該資料流的規模太大以至于超出了該公司原有的消息系統的處理能力。

從這些用例延伸開來,在接入hadoop叢集後,很快就需要提供實時資料處理的能力,現存的應用需要擴充和重構,利用現有的實時處理架構更高效得處理流式資料。以linkedin為例,我們最開始是利用kafka處理job資訊流,并将job資訊存入hadoop叢集,然後很多etl-centric的應用需求開始出現,這些job資訊流開始用于其他子系統,如下圖所示:

【譯】使用Apache Kafka建構流式資料平台(1)何為流式資料平台?

job-view.png

在這張圖中,job的定義不需要一些定制就可以與其他子系統互動,當上遊應用(移動應用)上出現新的工作資訊時,就會通過kafka發送一個全局事件,下遊的資料處理應用隻需要響應這個事件即可。

我們簡單講下流式資料平台與現存的類似系統的關系。

流式資料平台類似于企業消息系統——它接收消息事件,并把它們釋出到對應事件的訂閱者。不過,二者有三個重要的不同:

消息系統通常是作為某個應用中的一個元件來部署,不同的應用中有不同的消息系統,而流式資料平台希望成為整個企業的資料流hub。

消息系統與批處理系統(資料倉庫或者hadoop叢集)的互動性很差,因為消息系統的資料存儲容量有限;

消息系統并未提供與實時處理架構整合的api接口。

換句話說,流式資料平台可以看作在公司級别(消息系統的級别是項目)設計的消息系統。

為了便于跟其他系統整合,流式資料平台做了很多工作。它的角色跟informatica這類工具不同,流式資料平台是可以讓任何系統接入,并可以圍繞該平台建構不同的應用。

流式資料平台與資料聚合工具有一點重合的實踐:使用一個統一的資料流抽象,保證資料格式相同,這樣可以避免很多資料清洗任務。我會在這個系列文章的第二篇仔細論述這個主題。

我認為流式資料平台借鑒了很多企業服務總線的設計思想,不過提供了更好的實作方案。企業服務總線面臨的挑戰就是自身的資料傳輸效率很低;企業服務總線在部署時也面臨一些挑戰:不适合多租戶使用(ps,此處需要看下原文,歡迎指導)。

流式資料平台的優勢在于資料的傳輸與系統本身解耦合,資料的傳輸由各個應用自身完成,這樣就能避免平台自身成為瓶頸。

正常的資料庫系統都有類似的日志機制,例如golden gate,然而這個日志記錄機制僅限于資料庫使用,并不能作為通用的事件記錄平台。這些資料庫自帶的日志記錄機制主要用于同類型資料庫(eg:oracle-to-oracle)之前的互相備份。

流式資料平台并不能替代資料倉庫,恰恰相反,它為資料倉庫提供資料源。它的身份是一個資料管道,将資料傳輸到資料倉庫,用于長期轉化、資料分析和批處理。這個資料管道也為資料倉庫提供對外輸出結果資料的功能。

常用的流式處理架構,例如storm、samza或spark streaming可以很容易得跟流式資料平台整合。這些流式資料處理架構提供了豐富的api接口,可以簡化資料轉化和處理。

我們不隻是提出了一個很好的想法,我們面臨的需求很适合将自己的想法落地。過去五年我們都在建構kafka系統,幫助其他公司落地流式資料平台。今天,在矽谷有很多公司在實踐這套設計思路,每個使用者的行為都被實時記錄并處理。

我們一直在思考如何使用公司掌握的資料,是以建構了confluent平台,該平台上有一些工具用來幫助其他公司部署和使用apache kafka。如果你希望在自己的公司部署流式資料處理平台,那麼confluent平台對你絕對有用。

還有一些用的資源: