01 背景
随着資料時效性對企業的精細化營運越來越重要,“實時即未來”、“實時數倉”、“資料湖” 成為了近幾年炙手可熱的詞。流計算領域的格局也在這幾年發生了巨大的變化,Apache Flink 在流批一體的方向上不斷深耕,Apache Spark 的近實時處理有着一定的閱聽人,Apache Kafka 也有了 ksqlDB 高調地進軍流計算,而 Apache Storm 卻開始逐漸地退出曆史的舞台。
每一種引擎有其優勢的地方,如何選擇适合自己業務的流計算引擎成了一個由來已久的話題。除了比較各個引擎提供的不同的功能矩陣之外,性能是一個無法繞開的評估因素。基準測試(benchmark)就是用來評估系統性能的一個重要和常見的過程。
本文将探讨流計算基準測試設計上的難點,以及我們是如何設計 Nexmark 這個流計算基準測試架構的,以及将來的規劃。最後會回顧審視我們對基準測試的一些看法。
02 現有流計算基準測試的問題
目前在流計算領域中,還沒有一個行業标準的基準測試。目前業界較為人知的流計算 benchmark 是五年前雅虎 Storm 團隊釋出的 Yahoo Streaming Benchmarks:
https://github.com/yahoo/streaming-benchmarks。雅虎的原意是因為業界缺少反映真實場景的 benchmark,模拟了一個簡單的廣告場景來比較各個流計算架構,後來被廣泛引用。具體場景是從 Kafka 消費的廣告的點選流,關聯 Redis 中的廣告所屬的 campaign 資訊,然後做時間視窗聚合計數。
然而,正是因為雅虎團隊太過于追求還原真實的生産環境,導緻這些外部系統服務(Kafka, Redis)成為了作業的瓶頸。Ververica 曾在
這篇文章中做過一個擴充實驗,将資料源從 Kafka 替換成了一個内置的 datagen source,性能提升了 37 倍! 由此可見,引入的 Kafka 元件導緻了無法準确反映引擎真實的性能。更重要的一個問題是,Yahoo Benchmark 隻包含一個非常簡單的,類似 “Word Count” 的作業,它無法全面地反映當今複雜的流計算系統和業務。試想,誰會用一個簡單的 “Word Count” 去衡量比較各個資料庫之間的性能差異呢?正是這些原因使得 Yahoo Benchmark 無法成為 一個行業标準的基準測試。這也正是我們想要解決的問題。
是以,我們認為一個行業标準的基準測試應該具備以下幾個特點:
1. 可複現性。
可複現性是使得 benchmark 被信任的一個重要條件。許多 benchmark 的結果是難以重制的。有的是因為隻擺了個 benchmark 結果圖,用于生成這些結果的代碼并沒有公開。有的是因為用于 benchmark 的硬體不容易被别人擷取到。有的是因為 benchmark 依賴的服務太多,緻使測試結果不穩定。
2. 能調整作業的負載(資料量、資料分布)
例如資料庫領域非常著名的 TPC-H、TPC-DS 涵蓋了大量的 query 集合,來捕獲查詢引擎之間細微的差别。而且這些 query 集合都立于真實業務場景之上(商品零售行業),資料規模大,是以也很受一些大資料系統的青睐。
3. 能調整作業的負載。即資料量、資料分布。
在大資料領域,不同的資料規模對于引擎來說可能會是完全不同的事情。例如 Yahoo Benchmark 中使用的 campaign id 隻有 100 個,使得狀态非常小,記憶體都可以裝的下。這樣使得同步 IO 和 checkpoint 等的影響可以忽略不計。而真實的場景往往要面對大狀态,面臨的挑戰要複雜困難的多。像 TPC-DS 的資料生成工具會提供 scalar factor 的參數來控制資料量。其次在資料分布上最好也能貼近真實世界的資料,如有資料傾斜,及調整傾斜比例。進而能全面、綜合地反映業務場景和引擎之間地差異。
4. 有統一的性能衡量名額和采集彙總工具。
基準測試的性能名額的定義需要清晰、一緻,且能适用于各種計算引擎。然而流計算的性能名額要比傳統批處理的更難定義、更難采集。是流計算 benchmark 最具挑戰性的一個問題,這也會在下文展開描述。
我們也研究了很多其他的流計算相關的基準測試,包括:
StreamBench、
HiBench BigDataBench,但是它們都在上述幾個基本面有所欠缺。基準測試的行業标杆無疑是 TPC 釋出的一系列 benchmark,如 TPC-H,TPC-DS。然而這些 benchmark 是面向傳統資料庫、傳統數倉而設計的,并不适用于今天的流計算系統。例如 benchmark 中沒有考慮事件時間、資料的亂序、視窗等流計算中常見的場景。是以我們不得不考慮重新設計并開源一個流計算基準測試架構,Nexmark:
https://github.com/nexmark/nexmark。
03 Nexmark 基準測試架構的設計
為了提供一個滿足以上幾個基本面的流計算基準測試,我們設計和開發了 Nexmark 基準測試架構,并努力讓其成為流計算領域的标準 benchmark 。
Nexmark 基準測試架構來源于
NEXMark 研究論文,以及
Apache Beam Nexmark Suite,并在其之上進行了擴充和完善。Nexmark 基準測試架構不依賴任何第三方服務,隻需要部署好引擎和 Nexmark,通過腳本
nexmark/bin/run_query.sh all
即可等待并獲得所有 query 下的 benchmark 結果。下面我們将探讨 Nexmark 基準測試在設計上的一些決策。
移除外部 source、sink 依賴
如上所述,Yahoo Benchmark 使用了 Kafka 資料源,卻使得最終結果無法準确反映引擎的真實性能。此外,我們還發現,在 benchmark 快慢流雙流 JOIN 的場景時,如果使用了 Kafka 資料源,慢流會超前消費(快流易被反壓),導緻 JOIN 節點的狀态會緩存大量超前的資料。這其實不能反映真實的場景,因為在真實的場景下,慢流是無法被超前消費的(資料還未産生)。是以我們在 Nexmark 中使用了 datagen source,資料直接在記憶體中生成,資料不落地,直接向下遊節點發送。多個事件流都由單一的資料生成器生成,是以當快流被反壓時,也能抑制慢流的生成,較好地反映了真實場景。
與之類似的,我們也移除了外部 sink 的依賴,不再輸出到 Kafka/Redis,而是輸出到一個空 sink 中,即 sink 會丢棄收到的所有資料。
通過這種方式,我們保證了瓶頸隻會在引擎自身,進而能精确地測量出引擎之間細微的差異。
Metrics
批處理系統 benchmark 的 metric 通常采用總體耗時來衡量。然而流計算系統處理的資料是源源不斷的,無法統計 query 耗時。是以,我們提出三個主要的 metric:吞吐、延遲、CPU。Nexmark 測試架構會自動幫我們采集 metric,并做彙總,不需要部署任何第三方的 metric 服務。
吞吐:
吞吐(throughput)也常被稱作 TPS,描述流計算系統每秒能處理多少條資料。由于我們有多個事件流,所有事件流都由一個資料生成器生成,為了統一觀測角度,我們采用資料生成器的 TPS,而非單一事件流的 TPS。我們将一個 query 能達到的最大吞吐,作為其吞吐名額。例如,針對 Flink 引擎,我們通過 Flink REST API 暴露的
<source_operator_name>.numRecordsOutPerSecond
metric 來擷取目前吞吐量。
延遲:
延遲(Latency)描述了從資料進入流計算系統,到它的結果被輸出的時間間隔。對于視窗聚合,Yahoo Benchmark 中使用
output_system_time - window_end
作為延遲名額,這其實并沒有考慮資料在視窗輸出前的等待時間,這種計算結果也會極大地受到反壓的影響,是以其計算結果是不準确的。一種更準确的計算方式應為
output_system_time - max(ingest_time)
。 然而在非視窗聚合,或雙流 JOIN 中,延遲又會有不同的計算方式。
是以延遲的定義和采集在流計算系統中有很多現實存在的問題,需要根據具體 query 具體分析,這在[參考文獻[2]](
https://arxiv.org/pdf/1802.08496.pdf)中有詳細的讨論,這也是我們目前還未在 Nexmark 中實作延遲 metric 的原因。
CPU:
資源使用率是很多流計算 benchmark 中忽視的一個名額。由于在真實生産環境,我們并不會限制流計算引擎所能使用的核數,進而給系統更大的彈性。是以我們引入了 CPU 使用率,作為輔助名額,即作業一共消耗了多少核。通過
吞吐/cores
,可以計算出平均每個核對于吞吐的貢獻。對于程序的 CPU 使用率的采集,我們沒有使用 JVM CPU load,而是借鑒了 YARN 中的實作,通過采樣
/proc/<pid>/stat
并計算獲得,該方式可以獲得較為真實的程序 CPU 使用率。是以我們的 Nexmark 測試架構需要在測試開始前,先在每台機器上部署 CPU 采集程序。
Query 與 Schema
Nexmark 的業務模型基于一個真實的線上拍賣系統。所有的 query 都基于相同的三個資料流,三個資料流會有一個資料生成器生成,來控制他們之間的比例、資料偏斜、關聯關系等等。這三個資料流分别是:
- 使用者(Person):代表一個送出拍賣,或參與競标的使用者。
- 拍賣(Auction):代表一個拍賣品。
- 競标(Bid): 代表一個對拍賣品的出價。
我們一共定義了 16 個 query,所有的 query 都使用 ANSI SQL 标準文法。基于 SQL ,我們可以更容易地擴充 query 測試集,支援更多的引擎。然而,由于 Spark 在流計算功能上的限制,大部分的 query 都無法通過 Structured Streaming 來實作。是以我們目前隻支援測試 Flink SQL 引擎。
Query | 标題 | 簡介 | Flink |
---|---|---|---|
q0 | Pass Through | 測量空跑時的開銷,包括監控和資料生成器的開銷。 | ✅ |
q1 | Currency Conversion | 将每個競标價格從美元轉換為歐元。 | |
q2 | Selection | 過濾出滿足條件的競标記錄。 | |
q3 | Local Item Suggestion | 來自指定城市的使用者的拍賣品。展示了雙流 JOIN。 | |
q4 | Average Price for a Category | 求出在每個分類下,獲勝競标的平均價格。 | |
q5 | Hot Items | 在過去一段時間,哪些拍賣品收到了最多的競标? | |
q6 | Average Selling Price by Seller | 每個賣家過去10個成功售出的拍賣品的平均價格是多少? | FLINK-19059 |
q7 | Highest Bid | 過去一段時間出價最高的競标,及其競标價格。 | |
q8 | Monitor New Users | 過去一段時間新進入系統并建立拍賣的使用者。 | |
q9 | Winning Bids | 計算每個拍賣品的獲勝競标記錄。 | |
q10 | Log to File System | 将所有事件記錄到檔案系統。展示了将資料流按視窗寫入分區檔案。 | |
q11 | User Sessions | 每個使用者在每個活躍周期中進行了多少次出價?展示了 session window。 | |
q12 | Processing Time Windows | 每個使用者在固定的處理時間視窗中進行了多少次出價?展示了 processing time window。 | |
q13 | Bounded Side Input Join | 競标流與一個靜态白名單關聯,展示了基礎的維表關聯。 | |
q14 | Calculation | 為競标流轉化和生成更多的字段。展示了更複雜的映射、過濾、UDF 的使用。 | |
q15 | Bidding Statistics Report | 每天有多少不同的使用者參與了不同等級的拍賣中?展示了多 count distinct 的應用。 |
作業負載的配置化
我們也支援配置調整作業的負載,包括資料生成器的吞吐量以及吞吐曲線、各個資料流之間的資料量比例、每個資料流的資料平均大小以及資料傾斜比例等等。具體的可以參考
Source DDL 參數04 實驗結果
我們在阿裡雲的三台機器上進行了 Nexmark 針對 Flink 的基準測試。每台機器均為 ecs.i2g.2xlarge 規格,配有 Xeon 2.5 GHz CPU (8 vCores) 以及 32 GB 記憶體,800 GB SSD 本地磁盤。機器之間的帶寬為 2 Gbps。
測試了 flink-1.11 版本,我們在這 3 台機器上部署了 Flink standalone 叢集,由 1 個 JobManager,8 個 TaskManager (每個隻有 1 slot)組成,都是 4 GB記憶體。叢集預設并行度為 8。開啟 checkpoint 以及 exactly once 模式,checkpoint 間隔 3 分鐘。使用 RocksDB 狀态後端。測試發現,對于有狀态的 query,每次 checkpoint 的大小在 GB 級以上,是以有效地測試的大狀态的場景。
Datagen source 保持 1000 萬每秒的速率生成資料,三個資料流的資料比例分别是 Bid: 92%,Auction: 6%,Person: 2%。每個 query 都先運作 3 分鐘熱身,之後 3 分鐘采集性能名額。
運作
nexmark/bin/run_query.sh all
後,列印測試結果如下:
+-------------------+-------------------+-------------------+-------------------+
| Nexmark Query | Throughput (r/s) | Cores | Throughput/Cores |
+-------------------+-------------------+-------------------+-------------------+
|q0 |1.9 M |8.17 |235 K |
|q1 |1.8 M |8.17 |228 K |
|q2 |2.1 M |8.16 |258 K |
|q3 |1.9 M |9.66 |198 K |
|q4 |305 K |11.55 |26 K |
|q5 |311 K |11.71 |26 K |
|q7 |153 K |12.14 |12 K |
|q8 |1.8 M |13.65 |135 K |
|q9 |170 K |11.86 |14 K |
|q10 |633 K |8.23 |76 K |
|q11 |428 K |10.5 |40 K |
|q12 |937 K |12.35 |75 K |
|q13 |1.4 M |8.26 |179 K |
|q14 |1.8 M |8.28 |228 K |
|q15 |729 K |9.06 |80 K |
+-------------------+-------------------+-------------------+-------------------+
05 總結
我們開發和設計 Nexmark 的初衷是為了推出一套标準的流計算 benchmark 測試集,以及測試流程。雖然目前僅支援了 Flink 引擎,但在目前也具有一定的意義,例如:
- 推動流計算 benchmark 的發展和标準化。
- 作為 Flink 引擎版本疊代之間的性能測試工具,甚至是日常回歸工具,及時發現性能回退的問題。
- 在開發 Flink 性能優化的功能時,可以用來驗證性能優化的效果。
- 部分公司可能會有 Flink 的内部版本,可以用作内部版本與開源版本之間的性能對比工具。
當然,我們也計劃持續改進和完善 Nexmark 測試架構,例如支援 Latency metric,支援更多的引擎,如 Spark Structured Streaming, Spark Streaming, ksqlDB, Flink DataStream 等等。也歡迎有志之士一起加入貢獻和擴充。
06 參考文獻
[1] Pete Tucker and Kristin Tufte. "NEXMark – A Benchmark for Queries over Data Streams". June 2010.
[2] Jeyhun Karimov and Tilmann Rabl. "Benchmarking Distributed Stream Data Processing Systems". arXiv:1802.08496v2 [cs.DB] Jun 2019
[3] Yangjun Wang. "Stream Processing Systems Benchmark: StreamBench". May 2016.