大資料浪潮已經爆紅了十幾年,但是流處理領域似乎一直不溫不火,直到近兩年:
- Confluent(Kafka 背後的商業化公司)上市;
- Snowflake、Databricks 紛紛投資 Streaming;
- Decodable、Immerok 等 start-up 公司的湧現;
- 2023年6月,SIGMOD Systems Award 頒發給了 Apache Flink;
SIGMOD 是資料管理、資料處理和資料存儲領域最具影響力的國際性學術會議之一
這讓人不免有些興奮——流計算的春天終于到來了嗎?
前言
今天我們從技術的角度聊聊主流的流計算系統。盡管概念上有許多共通之處,例如:時間視窗、水位(Watermark)等,但在實作層面上,各個系統幾乎都有獨特的設計,這種系統設計的多樣性也正呼應了流計算應用場景的多樣性,不能從單一次元上去判定孰好孰壞。
主流系統對比
本文從内部實作的角度,深入對比了市面上常見的流計算系統,包括 Apache Flink、RisingWave、Spark Streaming、ksqlDB 等,希望這篇文章能在你技術選型時有所幫助。
01
Apache Flink
Flink 誕生之初就提出“流批一體”的構想,即将流計算和批處理使用同一套 Runtime 解決。具體來說,它将批處理看作是流處理的一個特例,二者無非是有界和無界資料流的差別。
現在看來,盡管流批一體的設想還沒有那麼深入人心,但是 Flink 的确憑借它的出色設計,成為了最流行的開源流計算架構。
和衆多大資料架構一樣,Flink 計算運作在 JVM 之上。Flink 的程式設計接口叫作 DataStream API,相對地,還有一套批處理接口稱為 DataSet API,在這兩個程式設計接口之上,還提供了友善處理關系型資料的 Table API 以及 Flink SQL。上述接口底層共享 Runtime、排程、資料傳輸層等實作。
http://ericfu.me/images/2023/07/flink-arch.png
Runtime 部分基本上和常見的 MPP 系統一緻:算子以 DAG 方式組織在一起,并通過本地和網絡 channel 交換資料,分片并行處理。下文中很多系統也是類似,對于這些共同之處,我們不再贅述。
不同于很多批處理系統标配了列式結構,Flink 記憶體中的表示是行式結構,即每個 event(或 message)作為一個單元進行計算以及傳輸時的序列化。
為了加速執行,Flink SQL 中使用了 codegen 技術即時生成和編譯算子代碼,讓每行的計算盡可能高效。DataStream API 則隻能依賴 JVM 自身的 JIT 來優化使用者代碼。
狀态管理:
Flink 是首個引入狀态的流計算系統,它将 stateful operator 看作一等公民。今天我們已經很清楚,Streaming 中常用的 Join、聚合等算子都需要狀态。狀态管理是 Streaming 中不可或缺的一環,它直接決定了故障恢複的設計、一緻性語義等等。
Flink 的算子狀态儲存在算子本地的 RocksDB 執行個體中(這裡僅讨論開源版 Flink 的實作)。RocksDB 的 LSM-Tree 結構使得它能很容易獲得一個增量的快照,這是因為目前版本中的大部分 SST 檔案和上個版本是重合的,是以拷貝最新快照時隻需要拷貝變化的部分即可。Flink 利用了這一特性對本地狀态進行 checkpoint,最後将全局 checkpoint 儲存在持久化存儲上(例如 HDFS 或 S3)。
Flink 1.15 中引入了 Generalized incremental checkpoints 脫離 RocksDB 自行實作了增量 checkpoint,有興趣的讀者可以閱讀官方部落格。
正确進行 checkpoint 的關鍵如何獲得全局一緻的 checkpoint,這一點上 Flink 采用了 Chandy-Lamport 算法,我認為這是 Flink 最大的設計亮點。具體來說,我們從資料流的源頭(source)注入一些特殊的消息,稱為 Barrier。Barrier 将随着資料流中的其他消息一同流過整個 DAG,每經過一個 stateful operator 就會觸發相應相應的算子的 checkpoint 操作。而當 Barrier 流完整個 DAG 時,之前所有這些 checkpoint 就構成了一次一緻的全局 checkpoint。
http://ericfu.me/images/2023/07/chandy-lamport.png
Barrier 在遇到多輸入或多輸出的算子時會進行對齊(align),這也是它能保證全局一緻的關鍵所在,同時也是它引入的唯一 overhead。考慮到即便沒有 Barrier,大多數流計算任務也需要免不了對齊(例如視窗的計算),這個代價并不大。總體來看,Flink 以比較優雅的方式解決了一緻性 checkpoint。
Barrier 的對齊:收集到所有 fan-in 的 barrier 後,再向所有 fan-out 發射 barrier
基于上述的 checkpoint 機制,at-least once 以及 exactly-once delivery 都很容易實作。例如,對于 replayable source(例如 Kafka)和 idempotent sink(例如 Redis),唯一需要做的事情就是将 Kafka 目前消費 offset 作為狀态的一部分記錄在 checkpoint 中,就輕松實作了 exactly-once delivery。對于一些更複雜的情形,一些 Sink 也允許通過兩階段送出(2PC)和外部系統配合實作 exactly-once。
02
RisingWave
RisingWave 是一個年輕的流計算開源産品,也是我本人現在正在開發的項目。它對自身的定位是流資料庫(Streaming Database)而非通用的流計算架構,允許使用者使用 SQL 以物化視圖的形式定義流計算任務,其設計目标是為了讓流計算盡可能簡單易上手。它不提供程式設計 API,如有必要使用者可以通過 UDF 引入自定義的代碼邏輯。
RisingWave 使用 Rust 語言編寫。除了衆所周知的記憶體以及并發安全上的優勢,Rust 語言内置的 async 支援以及豐富的第三方庫也極大地幫助了我們高效應對流計算這樣的 IO 密集場景。
RisingWave 的流計算任務由許多個獨立的 Actor 構成,Actor 可以看作一個協程,由使用者态 Runtime(tokio)進行高效的排程。同時,這也使得算子内部的實作能夠采用高效的單線程記憶體資料結構,例如 Hash Join 所用的哈希表。
http://ericfu.me/images/2023/07/risingwave-compute.png
除了流計算,RisingWave 也能像資料庫那樣直接提供查詢能力,而且提供 snapshot read 的正确性保證。具體來說,隻要在一個事務中,直接查詢物化視圖的結果一定與執行其定義 SQL 的結果一緻。這很大程度上簡化了使用者驗證 Streaming 任務的正确性。
狀态管理:
上述的讀一緻性保證和其内部的 checkpoint 機制密不可分。RisingWave 采用與 Flink 類似的基于 Barrier 的全局一緻 checkpoint 機制,但是頻率要高得多,預設為 1s 一次(Flink 預設為 30min)。
是以使用者的讀請求作用于這些 checkpoint 上,總是能獲得一緻的結果。
存儲方面,RisingWave 并沒有直接使用 RocksDB 之類的開源元件,而是從頭打造了一套基于 LSM-Tree 和共享存儲的存儲引擎。這樣做的原因有許多,其中最主要的是為了計算節點能更加輕量地 scale out/in,而不需要像 Flink 那樣需要将 RocksDB 的狀态檔案拷貝到新的節點上。同時,我們也希望能夠更好地利用雲對象存儲的優勢,例如 S3 的低成本以及高可靠性。
RisingWave 内置存儲引擎,并基于此實作了類似資料庫的 serving 查詢的能力,是它相比其他系統的一大不同。
需要說明的是,Flink 後來引入的 Table Store (Paimon) 存儲彌補了 Flink 沒有内置表存儲的遺憾,但是 Table Store 的主要存儲為列式結構,更适合分析型查詢。而 RisingWave 的存儲引擎為行式,更适合點查這樣的 OLTP 查詢。
03
Spark Streaming
Apache Spark 原本被設計為一個批處理引擎。得益于 RDD 的設計,Spark 擁有比 Hadoop MapReduce 更優秀的性能。有興趣的讀者可以看我之前寫的《一文讀懂 Apache Spark》。
Spark Streaming 使用的技術稱為 D-Stream(Discretized Streams)。不同于其他流計算架構會長期運作算子的執行個體,Spark Streaming 将流資料切分成一個個批處理任務(micro-batch),用一系列的短暫、無狀态、确定性的批處理實作流處理。
Spark 2.x 中還引入了一個全新的類似 Continuous Processing Mode,但似乎不太流行,我們這裡不去讨論。
下面兩張圖描述了 Spark 如何通過 RDD 來實作 micro-batch 的流計算。對于無狀态的計算(例如 map),那其實和批計算中沒有任何不同。對于有狀态的計算(例如聚合),狀态的變遷可以視作是 RDD 的疊代,就像右圖中最右側的 counts RDD 那樣,它的祖先(lineage)除了計算的上遊,還有自己的前一個版本的 RDD。
D-Straem 處理模型:左-對每個時間間隔,生成相應基于 RDD 的計算圖;右-對有狀态算子,其祖先含上一時刻的 RDD
Spark Streaming 非常巧妙地将流計算轉換成了基于 RDD 的批處理,也自然地複用了 RDD 的錯誤容忍機制:隻要将失敗節點上丢失的 RDD Partition 重算即可。不過,很顯然這裡有個問題是 DStream RDD 的祖先會不斷延長,導緻恢複代價變得越來越高,更别說 replayable source 往往是有 retention 限制的。Spark Streaming 通過每隔一段時間調用 DStream RDD 的 checkpoint() 函數将其持久化,以截斷祖先鍊。
事實證明,上述 micro-batch 方案可以達到秒級至分鐘級的延遲。Streaming Systems 一書的作者也承認,大多數情況下,這樣的延遲已經能滿足需求了,“充其量是一個小小的抱怨”。但是也要承認,DStream 畢竟隻是對 stateful operator 的一種拙劣模仿,在保持設計簡潔性的同時,也需要付出更高的代價才能達到相同的計算性能。
04
Google Dataflow (WindMill)
Google Dataflow,或者它的開源版本 Apache Beam,其實僅僅是一個統一的程式設計接口,背後支援多種不同的後端 Runtime,包括 Apache Flink、Spark 等。我們這裡僅僅探讨 Google 自家的 WindMill 引擎。它更為人熟知的名字是 MillWheel,我對它了解也主要來自于 VLDB'13 的論文 [7]。
MillWheel 的計算和狀态管理是完全解藕的。
使用者編寫的算子通過 State API 讀寫以 Key-Value 模型儲存的持久化狀态(論文上為 BigTable)。MillWheel 沒有全局 checkpoint 的機制,每個算子在向下遊發射出資料之前,需要先将狀态寫入持久化存儲,類似資料庫的 WAL。
這樣做的好處是,算子本身保持了無狀态的優良特性,可以非常友善地進行故障恢複、排程等,但它的代價是高昂的,所有狀态的讀寫都需要通過 RPC 完成。
MillWheel 的使用者代碼隻需實作 ProcessRecord 接口,并可以通過 State API 接口儲存狀态
沒有全局一緻性的 checkpoint 也給實作 exactly-once delivery 帶來了挑戰。除非算子邏輯具有幂等性,否則算子需要對輸入進行去重,防止當機恢複時有重複消息被處理多次,為此又需要在外部存儲上儲存一段時間内的 message log。總體來說,該方案消耗了很多無謂的 RPC 代價。
05
Apache Kafka (ksqlDB)
Kafka 無疑是 Streaming 市場中最大的玩家,它首次将持久性(durability)引入中間件領域,奠定了整個流計算尤其是 exactly-once delivery 的基石。但是之是以放在這裡才講,是因為它的角色主要仍然是 Message Broker,而在計算方面乏善可陳。
ksqlDB (原名 KSQL)是一個建構在 Kafka 上的流處理引擎,由 Confluent 研發。ksqlDB 将流-表對偶性的概念發揚光大,也引入了物化視圖這樣的概念,允許使用者通過 SQL 定義流計算任務。盡管看起來很美好,ksqlDB 設計上有着諸多的限制和妥協,這可能和它輕量級插件的定位有關,但這也讓許多使用者場景不得不尋求其他的解決方案。
ksqlDB 對于狀态的處理就是一個妥協的例子。ksqlDB 利用 Kafka topic 儲存狀态的 changelog,并借助 RocksDB 将這些 changelog 物化成表,以便算子進行高效地查詢(看!一個流-表對偶性的實踐)。這樣迂回的方式導緻 ksqlDB 需要為相同數量的狀态消耗了數倍的資源,一不小心還可能引起這樣的資料不一緻的 bug。
另外,由于 ksqlDB 的任務總是運作在單個 Kafka 節點上(不支援 MPP 那樣的 shuffle),無論聚合還是 join 都需要使用者小心地確定資料已經按正确的方式分區。必要時,需要建立額外的 repartition 的 topic 才能讓跑起來。這也限制了 ksqlDB 對複雜 SQL 的處理能力。
06
其他系統
以下這些系統大多已經不再流行,但是它們的設計思路以及取舍仍然值得我們學習。
- Flume/FlumeJava
最初由 Google 研發,可能更已知的最早的流計算系統,誕生于 2007 年,最初定位于一套友善開發流計算的程式設計架構,後來也被用于實作 MillWheel。它的核心是一個叫做 PCollection 的資料模型,它是一個不可變的、有序的、可重複的資料集合,類似于 Spark 的 RDD,而 PTransform 定義了如果對 PCollection 進行轉換。Flume 沒有内置狀态管理,使用者需要自己借助外部資料庫等方式實作。
- Apache Storm
由 Twitter 開源,是另一個早期的流計算系統,它的核心是一個叫做 Tuple 的資料模型,類似 PCollection。相比于其他系統在 exactly-once delivery 上的努力,Storm 選擇了追求更快的性能而放棄一緻性保證,它僅支援 at-least once 的語義,這讓它的實作變得相對簡單高效。不令人意外,Storm 也沒有内置狀态管理,使用者需要自己借助外部資料庫等方式實作。
- Materialize
可能是最早提出 Streaming Database 這一概念的産品。和 RisingWave 類似,它僅提供 SQL 接口,允許使用者定義表、物化視圖等。Materialize 基于名叫 Differential Dataflow 的 Rust 流計算架構開發,它支援對 Collection 進行各種變換以定義資料流。算子狀态儲存在記憶體中的 Arrangement 結構中,這一設計導緻它事實上成為了一個單節點的記憶體資料庫,限制了它能處理的資料規模。它也不具備 checkpoint 功能,需要通過重放恢複狀态。
References:
- State management in Apache Flink: consistent stateful distributed stream processing
- Apache flink: Stream and batch processing in a single engine
- GitHub - risingwavelabs/risingwave
- Discretized streams: Fault-tolerant streaming computation at scale
- Structured streaming: A declarative api for real-time applications in apache spark
- Dataflow Under the Hood: Understanding Dataflow techniques - Sam McVeety, Ryan Lippert
- Millwheel: Fault-tolerant stream processing at internet scale.
- ksqlDB Performance Guidelines - ksqlDB Documentation
- Streaming Systems - Reuven Lax, Slava Chernyak, and Tyler Akidau
- FlumeJava: Easy, Efficient Data-Parallel Pipelines
- GitHub - MaterializeInc/materialize
關于 RisingWave
RisingWave 是一款分布式 SQL 流處理資料庫,旨在幫助使用者降低實時應用的的開發成本。作為專為雲上分布式流處理而設計的系統,RisingWave 為使用者提供了與 PostgreSQL 類似的使用體驗,并且具備比 Flink 高出 10 倍的性能以及更低的成本。
本文作者:Eric Fu RisingWave Labs 核心開發工程師