天天看點

從開發到生産上線,如何确定叢集規劃大小?

翻譯|毛家琦

校對|秦江傑

在 Flink 社群中,最常被問到的問題之一是:在從開發到生産上線的過程中如何确定叢集的大小。這個問題的标準答案顯然是“視情況而定”,但這并非一個有用的答案。本文概述了一系列的相關問題,通過回答這些問題,或許你能得出一些數字作為指導和參考。

計算并建立一個基線

第一步是仔細考慮應用程式的運維名額,以達到所需資源的基線。

需要考慮的關鍵名額是:

  • 每秒記錄數和每條記錄的大小
  • 已有的不同鍵(key)的數量和每個鍵對應的狀态大小
  • 狀态更新的次數和狀态後端的通路模式

最後,一個更實際的問題是與客戶之間圍繞停機時間、延遲和最大吞吐量的服務級别協定(sla),因為這些直接影響容量規劃。

接下來,根據預算,看看有什麼可用的資源。例如:

  • 網絡容量,同時把使用網絡的外部服務也納入考慮,如 Kafka、HDFS 等。
  • 磁盤帶寬,如果您依賴于基于磁盤的狀态後端,如 RocksDB(并考慮其他磁盤使用,如 Kafka 或 HDFS)
  • 可用的機器數量、CPU 和記憶體

基于所有這些因素,現在可以為正常運作建構一個基線,外加一個資源緩沖量用于恢複追趕或處理負載尖峰。建議您在建立基線時也考慮檢查點期間(checkpointing)使用的資源情況。

示例:資料說明

目前在假設的叢集上計劃作業部署,将建立資源使用基線的過程可視化。這些數字是粗略的值,它們并不全面——在文章的最後将進一步說明在進行計算過程中遺漏的部分。

Flink 流計算作業和硬體示例

從開發到生産上線,如何确定叢集規劃大小?

Flink 流計算作業拓撲示例

在本案例中,我将部署一個典型的 Flink 流處理作業,該作業使用 Flink 的 Kafka 資料消費者從 Kafka 消息源中讀取資料。然後使用帶鍵的總計視窗運算符(window operator)進行轉換運算。視窗運算符在時間視窗 5 分鐘執行聚合。由于總是有新的資料,故将把視窗配置為 1 分鐘的滑動視窗( sliding window )。

這意味着将在每分鐘更新過去 5 分鐘的聚合量。流計算作業為每個使用者 id 建立一個合計量。從 Kafka 消息源消費的每條消息大小(平均)為 2 kb。

假設吞吐量為每秒 100 萬條消息。要了解視窗運算符(window operator)的狀态大小,需要知道不同鍵的數目。在本例中,鍵(keys)是使用者 id 的數量,即 500000000 個不同的使用者。對于每個使用者,需要計算四個數字,存儲為長整形(8位元組)。

總結一下工作的關鍵名額:

  • 消息大小:2 KB
  • 吞吐量:1000000 msg/秒
  • 不同鍵數量:500000000(視窗聚合:每個鍵 4 個長整形)
  • Checkpointing:每分鐘一次。
從開發到生産上線,如何确定叢集規劃大小?

假定的硬體設定

如上圖所示,共有五台機器在運作作業,每台機器運作一個 Flink 任務管理器(Flink 的工作節點)。磁盤是通過網絡互相連接配接的(這在雲設定中很常見),從主交換機到運作 TaskManager 的每台計算機都由一個 10 千兆位以太網連接配接。Kafka 緩存代理(brokers)在不同的機器上分開運作。

每台機器有 16 個 CPU 核。為了簡化處理,不考慮 CPU 和記憶體需求。但實際情況中,根據應用程式邏輯和正在使用的狀态後端,我們需要注意記憶體。這個例子使用了一個基于 RocksDB 的狀态後端,它穩定并且記憶體需求很低。

從單獨的一台機器的視角

要了解整個作業部署的資源需求,最容易的方法是先關注一台計算機和一個 TaskManager 中的操作。然後,可以使用一台計算機的數字來計算總體資源需求量。

預設情況下(如果所有運算符具有相同的并行度并且沒有特殊的排程限制),流作業的所有運算符都在每一台計算機上運作。

在這種情況下,Kafka 源(或消息消費者)、視窗運算符和 Kafka 發送端(或消息生産者)都在這五台機器上運作。

從開發到生産上線,如何确定叢集規劃大小?

機器視角圖-TaskManager n

從上圖來看,keyBy 是一個單獨運算符,是以計算資源需求更容易。實際上,keyBy 是一個 API 構造,并轉換為 Kafka source 和視窗運算符(window operator)之間連接配接的配置屬性。

以下将自上而下地分析(上圖)這些運算符,了解他們的網絡資源需求。

The Kafka source

要計算單個 Kafka 源(source)接收的資料量,我們首先計算 Kafka 的合計輸入。這些 source 每秒接收 1000000 條消息,每條消息大小為 2 KB。

2 KB x 1,000,000/s = 2 GB/s

将 2 GB/s 除以機器數(5)得到以下結果:

2 GB/s ÷ 5 台機器 = 400 MB/s

群集中運作的 5 個 Kafka 源中的每一個都接收平均吞吐量為 400 MB/s 的資料結果。

從開發到生産上線,如何确定叢集規劃大小?

Kafka source 的計算過程

The Shuffle / keyBy

接下來,需要確定具有相同鍵(在本例中為使用者 id)的所有事件都在同一台計算機上結束。正在讀取的 Kafka 消息源的資料(在 Kafka 中)可能會根據不同的分區方案進行分區。

Shuffle 過程将具有相同鍵的所有資料發送到一台計算機,是以需要将來自 Kafka 的 400 MB/s 資料流拆分為一個 user id 分區流:

400 MB/s ÷ 5 台機器 = 80 MB/s

平均而言,我們必須向每台計算機發送 80 MB/s 的資料。此分析是從一台機器的角度進行的,這意味着某些資料已經在指定的目标機器運作了,是以減去 80 MB/s 即可:

400 MB/s - 80 MB = 320 MB/s

可以得到結果:每台機器以 320 MB/s 的速率接收和發送使用者資料。

從開發到生産上線,如何确定叢集規劃大小?

The shuffle 的計算過程

  • Window 視窗輸出和 Kafka 發送

下一個要問的問題是視窗運算符發出多少資料并發送到 Kafka 接收器。答案是 67 MB/s,我們來解釋一下我們是怎麼得到這個數字的。

視窗運算符為每個鍵(key)保留 4 個數字(表示為長整形)的聚合值。運算符每分鐘發出一次目前聚合總值。每個鍵從聚合中發出 2 個整形(user_id, window_ts)和 4 個長整形:

(2 x 4 位元組)+(4 x 8 位元組)=每個鍵 40 位元組

然後将鍵的總數(500000000 除以機器數量)計算在内:

100000000 個 keys x 40 個位元組 = 4 GB (從每台機器來看)

然後計算每秒大小:

4 GB/分鐘 ÷ 60 = 67 MB/秒 (由每個任務管理器發出)

這意味着每個任務管理器平均從視窗運算符發出 67 MB/s 的使用者資料。由于每個任務管理器上都有一個 Kafka 發送端(和視窗運算符在同一個任務管理器中),并且沒有進一步的重新分區,是以這得到的是 Flink 向 Kafka 發送的資料量。

從開發到生産上線,如何确定叢集規劃大小?

使用者資料:從 Kafka,分發到視窗運算符并傳回到 Kafka

視窗運算器的資料發射預計将是“突發”的,因為它們每分鐘發送一次資料。實際上,運算符不會以 67 mb/s 的恒定速率給客戶發送資料,而是每分鐘内将可用帶寬最大化幾秒鐘。

這些總計為:

  • 資料輸入:每台機器 720 MB/s(400+320)
  • 資料輸出:每台機器 387 MB/s(320+67)
從開發到生産上線,如何确定叢集規劃大小?
  • 狀态通路和檢查點

這不是全部的(内容)。到目前為止,我隻檢視了 Flink 正在處理的使用者資料。在實際情況中需要計入從磁盤通路的開銷,包括到 RocksDB 的存儲狀态和檢查點。要了解磁盤通路成本,請檢視視窗運算符(window operator)如何通路狀态。Kafka 源也保持一定的狀态,但與視窗運算符相比,它可以忽略不計。

要了解視窗運算符(window operator)的狀态大小,需要從不同的角度進行檢視。Flink 正在用 1 分鐘的滑動視窗計算 5 分鐘的視窗量。Flink 通過維護五個視窗來實作滑動視窗,每次滑動都對應一個 1 分鐘的視窗。如前所述,當使用視窗實作即時聚合時,将為每個視窗中的每個鍵(key)維護 40 位元組的狀态。對于每個傳入事件,首先需要從磁盤檢索目前聚合值(讀取 40 位元組),更新聚合值,然後将新值寫回(寫入 40 位元組)。

從開發到生産上線,如何确定叢集規劃大小?

視窗狀态

這意味着:

40 位元組狀态 x 5 個視窗 x 每台計算機 200000 msg/s = 40 MB/s

即需要的每台計算機的讀或寫磁盤通路權限。如前所述,磁盤是網絡互相連接配接的,是以需要将這些數字添加到總吞吐量計算中。

現在總數是:

  • 資料輸入:760 MB/s(400 MB/s 資料輸入 + 320 MB/s 随機播放 + 40 MB/s 狀态)
  • 資料輸出:427 MB/s(320 MB/s 随機播放 + 67 MB/s 資料輸出 + 40 MB/s 狀态)
從開發到生産上線,如何确定叢集規劃大小?

上述考慮是針對狀态通路的,當新事件到達視窗運算符時,狀态通路會持續進行,還需要容錯啟用檢查點。如果機器或其他部分出現故障,需要恢複視窗内容并繼續處理。

檢查點設定為每分鐘一個檢查點,每個檢查點将作業的整個狀态複制到網絡連接配接的檔案系統中。

讓我們一起來看看每台計算機上的整個狀态有多大:

40 位元組狀态 x 5 個視窗 x 100000000 個 keys = 20 GB

并且,要獲得每秒的值:

20 GB ÷ 60 = 333 MB/秒

與視窗運算類似,檢查點是突發的,每分鐘一次,它都試圖将資料全速發送到外部存儲器。Checkpointing 引發對 RocksDB 的額外狀态通路(在本案例中,RocksDB 位于網絡連接配接的磁盤上)。自 Flink 1.3 版本以來,RocksDB 狀态後端支援增量 checkpoint,概念上通過僅發送自上一個 checkpoint 以來的變化量,減少了每個 checkpoint 上所需的網絡傳輸,但本例中不使用此功能。

這會将總數更新為:

  • 資料輸入:760 MB/s(400+320+40)
  • 資料輸出:760 MB/s(320+67+40+333)
從開發到生産上線,如何确定叢集規劃大小?

這意味着整個網絡流量為:

760+760 x 5 + 400 + 2335 = 10335 MB/秒

400 是 5 台機器上 80 MB狀态通路(讀寫)程序的總和,2335 是叢集上 Kafka 輸入和輸出程序的總和。

這大概是上圖所示硬體設定中可用網絡容量的一半以上。

從開發到生産上線,如何确定叢集規劃大小?

聯網要求

補充一點,這些計算都不包括協定開銷,例如來自 Flink、Kafka 或檔案系統的 TCP、Ethernet 和 RPC 調用。但這仍然是一個很好的出發點,可以幫助您了解工作所需的硬體類型,以及性能名額。

擴充方法

基于以上分析,這個例子,在一個 5 節點叢集的典型運作中,每台機器都需要處理 760 個 Mb/s 的資料,無論是輸入還是輸出,從 1250 Mb/s 的總容量來看,它保留了大約 40% 的網絡容量因為部分被主觀所簡化的複雜因素,例如網絡協定開銷、從檢查點恢複事件重放期間的重載,以及由資料歪斜引起的跨叢集的負載不平衡。

對于 40% 的淨空是否合适,沒有一個一刀切的答案,但是這個算法應該是一個很好的起點。嘗試上面的計算,更換機器數量、鍵(keys)的數量或每秒的消息數,選擇要考慮的運維名額,然後将其與您的預算和運維因素相平衡。

作者:Robert Metzger

原文連結:

https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

繼續閱讀