翻譯|毛家琦
校對|秦江傑
在 Flink 社群中,最常被問到的問題之一是:在從開發到生産上線的過程中如何确定叢集的大小。這個問題的标準答案顯然是“視情況而定”,但這并非一個有用的答案。本文概述了一系列的相關問題,通過回答這些問題,或許你能得出一些數字作為指導和參考。
計算并建立一個基線
第一步是仔細考慮應用程式的運維名額,以達到所需資源的基線。
需要考慮的關鍵名額是:
- 每秒記錄數和每條記錄的大小
- 已有的不同鍵(key)的數量和每個鍵對應的狀态大小
- 狀态更新的次數和狀态後端的通路模式
最後,一個更實際的問題是與客戶之間圍繞停機時間、延遲和最大吞吐量的服務級别協定(sla),因為這些直接影響容量規劃。
接下來,根據預算,看看有什麼可用的資源。例如:
- 網絡容量,同時把使用網絡的外部服務也納入考慮,如 Kafka、HDFS 等。
- 磁盤帶寬,如果您依賴于基于磁盤的狀态後端,如 RocksDB(并考慮其他磁盤使用,如 Kafka 或 HDFS)
- 可用的機器數量、CPU 和記憶體
基于所有這些因素,現在可以為正常運作建構一個基線,外加一個資源緩沖量用于恢複追趕或處理負載尖峰。建議您在建立基線時也考慮檢查點期間(checkpointing)使用的資源情況。
示例:資料說明
目前在假設的叢集上計劃作業部署,将建立資源使用基線的過程可視化。這些數字是粗略的值,它們并不全面——在文章的最後将進一步說明在進行計算過程中遺漏的部分。
Flink 流計算作業和硬體示例

Flink 流計算作業拓撲示例
在本案例中,我将部署一個典型的 Flink 流處理作業,該作業使用 Flink 的 Kafka 資料消費者從 Kafka 消息源中讀取資料。然後使用帶鍵的總計視窗運算符(window operator)進行轉換運算。視窗運算符在時間視窗 5 分鐘執行聚合。由于總是有新的資料,故将把視窗配置為 1 分鐘的滑動視窗( sliding window )。
這意味着将在每分鐘更新過去 5 分鐘的聚合量。流計算作業為每個使用者 id 建立一個合計量。從 Kafka 消息源消費的每條消息大小(平均)為 2 kb。
假設吞吐量為每秒 100 萬條消息。要了解視窗運算符(window operator)的狀态大小,需要知道不同鍵的數目。在本例中,鍵(keys)是使用者 id 的數量,即 500000000 個不同的使用者。對于每個使用者,需要計算四個數字,存儲為長整形(8位元組)。
總結一下工作的關鍵名額:
- 消息大小:2 KB
- 吞吐量:1000000 msg/秒
- 不同鍵數量:500000000(視窗聚合:每個鍵 4 個長整形)
- Checkpointing:每分鐘一次。
假定的硬體設定
如上圖所示,共有五台機器在運作作業,每台機器運作一個 Flink 任務管理器(Flink 的工作節點)。磁盤是通過網絡互相連接配接的(這在雲設定中很常見),從主交換機到運作 TaskManager 的每台計算機都由一個 10 千兆位以太網連接配接。Kafka 緩存代理(brokers)在不同的機器上分開運作。
每台機器有 16 個 CPU 核。為了簡化處理,不考慮 CPU 和記憶體需求。但實際情況中,根據應用程式邏輯和正在使用的狀态後端,我們需要注意記憶體。這個例子使用了一個基于 RocksDB 的狀态後端,它穩定并且記憶體需求很低。
從單獨的一台機器的視角
要了解整個作業部署的資源需求,最容易的方法是先關注一台計算機和一個 TaskManager 中的操作。然後,可以使用一台計算機的數字來計算總體資源需求量。
預設情況下(如果所有運算符具有相同的并行度并且沒有特殊的排程限制),流作業的所有運算符都在每一台計算機上運作。
在這種情況下,Kafka 源(或消息消費者)、視窗運算符和 Kafka 發送端(或消息生産者)都在這五台機器上運作。
機器視角圖-TaskManager n
從上圖來看,keyBy 是一個單獨運算符,是以計算資源需求更容易。實際上,keyBy 是一個 API 構造,并轉換為 Kafka source 和視窗運算符(window operator)之間連接配接的配置屬性。
以下将自上而下地分析(上圖)這些運算符,了解他們的網絡資源需求。
The Kafka source
要計算單個 Kafka 源(source)接收的資料量,我們首先計算 Kafka 的合計輸入。這些 source 每秒接收 1000000 條消息,每條消息大小為 2 KB。
2 KB x 1,000,000/s = 2 GB/s
将 2 GB/s 除以機器數(5)得到以下結果:
2 GB/s ÷ 5 台機器 = 400 MB/s
群集中運作的 5 個 Kafka 源中的每一個都接收平均吞吐量為 400 MB/s 的資料結果。
Kafka source 的計算過程
The Shuffle / keyBy
接下來,需要確定具有相同鍵(在本例中為使用者 id)的所有事件都在同一台計算機上結束。正在讀取的 Kafka 消息源的資料(在 Kafka 中)可能會根據不同的分區方案進行分區。
Shuffle 過程将具有相同鍵的所有資料發送到一台計算機,是以需要将來自 Kafka 的 400 MB/s 資料流拆分為一個 user id 分區流:
400 MB/s ÷ 5 台機器 = 80 MB/s
平均而言,我們必須向每台計算機發送 80 MB/s 的資料。此分析是從一台機器的角度進行的,這意味着某些資料已經在指定的目标機器運作了,是以減去 80 MB/s 即可:
400 MB/s - 80 MB = 320 MB/s
可以得到結果:每台機器以 320 MB/s 的速率接收和發送使用者資料。
The shuffle 的計算過程
- Window 視窗輸出和 Kafka 發送
下一個要問的問題是視窗運算符發出多少資料并發送到 Kafka 接收器。答案是 67 MB/s,我們來解釋一下我們是怎麼得到這個數字的。
視窗運算符為每個鍵(key)保留 4 個數字(表示為長整形)的聚合值。運算符每分鐘發出一次目前聚合總值。每個鍵從聚合中發出 2 個整形(user_id, window_ts)和 4 個長整形:
(2 x 4 位元組)+(4 x 8 位元組)=每個鍵 40 位元組
然後将鍵的總數(500000000 除以機器數量)計算在内:
100000000 個 keys x 40 個位元組 = 4 GB (從每台機器來看)
然後計算每秒大小:
4 GB/分鐘 ÷ 60 = 67 MB/秒 (由每個任務管理器發出)
這意味着每個任務管理器平均從視窗運算符發出 67 MB/s 的使用者資料。由于每個任務管理器上都有一個 Kafka 發送端(和視窗運算符在同一個任務管理器中),并且沒有進一步的重新分區,是以這得到的是 Flink 向 Kafka 發送的資料量。
使用者資料:從 Kafka,分發到視窗運算符并傳回到 Kafka
視窗運算器的資料發射預計将是“突發”的,因為它們每分鐘發送一次資料。實際上,運算符不會以 67 mb/s 的恒定速率給客戶發送資料,而是每分鐘内将可用帶寬最大化幾秒鐘。
這些總計為:
- 資料輸入:每台機器 720 MB/s(400+320)
- 資料輸出:每台機器 387 MB/s(320+67)
- 狀态通路和檢查點
這不是全部的(内容)。到目前為止,我隻檢視了 Flink 正在處理的使用者資料。在實際情況中需要計入從磁盤通路的開銷,包括到 RocksDB 的存儲狀态和檢查點。要了解磁盤通路成本,請檢視視窗運算符(window operator)如何通路狀态。Kafka 源也保持一定的狀态,但與視窗運算符相比,它可以忽略不計。
要了解視窗運算符(window operator)的狀态大小,需要從不同的角度進行檢視。Flink 正在用 1 分鐘的滑動視窗計算 5 分鐘的視窗量。Flink 通過維護五個視窗來實作滑動視窗,每次滑動都對應一個 1 分鐘的視窗。如前所述,當使用視窗實作即時聚合時,将為每個視窗中的每個鍵(key)維護 40 位元組的狀态。對于每個傳入事件,首先需要從磁盤檢索目前聚合值(讀取 40 位元組),更新聚合值,然後将新值寫回(寫入 40 位元組)。
視窗狀态
這意味着:
40 位元組狀态 x 5 個視窗 x 每台計算機 200000 msg/s = 40 MB/s
即需要的每台計算機的讀或寫磁盤通路權限。如前所述,磁盤是網絡互相連接配接的,是以需要将這些數字添加到總吞吐量計算中。
現在總數是:
- 資料輸入:760 MB/s(400 MB/s 資料輸入 + 320 MB/s 随機播放 + 40 MB/s 狀态)
- 資料輸出:427 MB/s(320 MB/s 随機播放 + 67 MB/s 資料輸出 + 40 MB/s 狀态)
上述考慮是針對狀态通路的,當新事件到達視窗運算符時,狀态通路會持續進行,還需要容錯啟用檢查點。如果機器或其他部分出現故障,需要恢複視窗内容并繼續處理。
檢查點設定為每分鐘一個檢查點,每個檢查點将作業的整個狀态複制到網絡連接配接的檔案系統中。
讓我們一起來看看每台計算機上的整個狀态有多大:
40 位元組狀态 x 5 個視窗 x 100000000 個 keys = 20 GB
并且,要獲得每秒的值:
20 GB ÷ 60 = 333 MB/秒
與視窗運算類似,檢查點是突發的,每分鐘一次,它都試圖将資料全速發送到外部存儲器。Checkpointing 引發對 RocksDB 的額外狀态通路(在本案例中,RocksDB 位于網絡連接配接的磁盤上)。自 Flink 1.3 版本以來,RocksDB 狀态後端支援增量 checkpoint,概念上通過僅發送自上一個 checkpoint 以來的變化量,減少了每個 checkpoint 上所需的網絡傳輸,但本例中不使用此功能。
這會将總數更新為:
- 資料輸入:760 MB/s(400+320+40)
- 資料輸出:760 MB/s(320+67+40+333)
這意味着整個網絡流量為:
760+760 x 5 + 400 + 2335 = 10335 MB/秒
400 是 5 台機器上 80 MB狀态通路(讀寫)程序的總和,2335 是叢集上 Kafka 輸入和輸出程序的總和。
這大概是上圖所示硬體設定中可用網絡容量的一半以上。
聯網要求
補充一點,這些計算都不包括協定開銷,例如來自 Flink、Kafka 或檔案系統的 TCP、Ethernet 和 RPC 調用。但這仍然是一個很好的出發點,可以幫助您了解工作所需的硬體類型,以及性能名額。
擴充方法
基于以上分析,這個例子,在一個 5 節點叢集的典型運作中,每台機器都需要處理 760 個 Mb/s 的資料,無論是輸入還是輸出,從 1250 Mb/s 的總容量來看,它保留了大約 40% 的網絡容量因為部分被主觀所簡化的複雜因素,例如網絡協定開銷、從檢查點恢複事件重放期間的重載,以及由資料歪斜引起的跨叢集的負載不平衡。
對于 40% 的淨空是否合适,沒有一個一刀切的答案,但是這個算法應該是一個很好的起點。嘗試上面的計算,更換機器數量、鍵(keys)的數量或每秒的消息數,選擇要考慮的運維名額,然後将其與您的預算和運維因素相平衡。
作者:Robert Metzger
原文連結:
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines