天天看點

有贊 Flink 實時任務資源優化探索與實踐

作者|沈磊

随着 Flink K8s 化以及實時叢集遷移完成,有贊越來越多的 Flink 實時任務運作在 K8s 叢集上,Flink K8s 化提升了實時叢集在大促時彈性擴縮容能力,更好的降低大促期間機器擴縮容的成本。同時,由于 K8s 在公司内部有專門的團隊進行維護, Flink K8s 化也能夠更好的減低公司的運維成本。

不過目前 Flink K8s 任務資源是使用者在實時平台端進行配置,使用者本身對于實時任務具體配置多少資源經驗較少,是以存在使用者資源配置較多,但實際使用不到的情形。比如一個 Flink 任務實際上 4 個并發能夠滿足業務處理需求,結果使用者配置了 16 個并發,這種情況會導緻實時計算資源的浪費,進而對于實時叢集資源水位以及底層機器成本,都有一定影響。基于這樣的背景,本文從 Flink 任務記憶體以及消息能力處理方面,對 Flink 任務資源優化進行探索與實踐。

一、Flink 計算資源類型與優化思路

1.1 Flink 計算資源類型

一個 Flink 任務的運作,所需要的資源我認為能夠分為 5 類:

  1. 記憶體資源
  2. 本地磁盤(或雲盤)存儲
  3. 依賴的外部存儲資源。比如 HDFS、S3 等(任務狀态/資料),HBase、MySQL、Redis 等(資料)
  4. CPU 資源
  5. 網卡資源
有贊 Flink 實時任務資源優化探索與實踐

目前 Flink 任務使用最主要的還是記憶體和 CPU 資源,本地磁盤、依賴的外部存儲資源以及網卡資源一般都不會是瓶頸,是以本文我們是從 Flink 任務的記憶體和 CPU 資源,兩個方面來對 Flink 實時任務資源進行優化。

1.2 Flink 實時任務資源優化思路

對于 Flink 實時任務資源分析思路,我們認為主要包含兩點:

  • 一是從任務記憶體視角,從堆記憶體方面對實時任務進行分析。
  • 另一方面則是從實時任務消息處理能力入手,保證滿足業務方資料處理需求的同時,盡可能合理使用 CPU 資源。

之後再結合實時任務記憶體分析所得相關名額、實時任務并發度的合理性,得出一個實時任務資源預設值,在和業務方充分溝通後,調整實時任務資源,最終達到實時任務資源配置合理化的目的,進而更好的降低機器使用成本。

■ 1.2.1 任務記憶體視角

那麼如何分析 Flink 任務的堆記憶體呢?這裡我們是結合 Flink 任務 GC 日志來進行分析。GC 日志包含了每次 GC 堆内不同區域記憶體的變化和使用情況。同時根據 GC 日志,也能夠擷取到一個 Taskmanager 每次 Full GC 後,老年代剩餘空間大小。可以說,擷取實時任務的 GC 日志,使我們進行實時任務記憶體分析的前提。

GC 日志内容分析,這裡我們借助開源的 GC Viewer 工具來進行具體分析,每次分析完,我們能夠擷取到 GC 相關名額,下面是通過 GC Viewer 分析一次 GC 日志的部分結果:

有贊 Flink 實時任務資源優化探索與實踐

上面通過 GC 日志分析出單個 Flink Taskmanager 堆總大小、年輕代、老年代配置設定的記憶體空間、Full GC 後老年代剩餘大小等,當然還有很多其他名額,相關名額定義可以去 Github 具體檢視。

這裡最重要的還是Full GC 後老年代剩餘大小這個名額,按照《Java 性能優化權威指南》這本書 Java 堆大小計算法則,設 Full GC 後老年代剩餘大小空間為 M,那麼堆的大小建議 3 ~ 4倍 M,新生代為 1 ~ 1.5 倍 M,老年代應為 2 ~ 3 倍 M,當然,真實對記憶體配置,你可以按照實際情況,将相應比例再調大些,用以防止流量暴漲情形。

是以通過 Flink 任務的 GC 日志,我們可以計算出實時任務推薦的堆記憶體總大小,當發現推薦的堆記憶體和實際實時任務的堆記憶體大小相差過大時,我們就認為能夠去降低業務方實時任務的記憶體配置,進而降低機器記憶體資源的使用。

■ 1.2.2 任務消息處理能力視角

對于 Flink 任務消息處理能力分析,我們主要是看實時任務消費的資料源機關時間的輸入,和實時任務各個 Operator / Task 消息處理能力是否比對。Operator 是 Flink 任務的一個算子,Task 則是一個或者多個算子 Chain 起來後,一起執行的實體載體。

資料源我們内部一般使用 Kafka,Kafka Topic 的機關時間輸入可以通過調用 Kafka Broker JMX 名額接口進行擷取,當然你也可以調用 Flink Rest Monitoring 相關 API 擷取實時任務所有 Kafka Source Task 機關時間輸入,然後相加即可。不過由于反壓可能會對 Source 端的輸入有影響,這裡我們是直接使用 Kafka Broker 名額 JMX 接口擷取 Kafka Topic 機關時間輸入。

在擷取到實時任務 Kafka Topic 機關時間輸入後,下面就是判斷實時任務的消息處理能力是否與資料源輸入比對。一個實時任務整體的消息處理能力,會受到處理最慢的 Operator / Task 的影響。打個比方,Flink 任務消費的 Kafka Topic 輸入為 20000 Record / S,但是有一個 Map 算子,其并發度為 10 ,Map 算子中業務方調用了 Dubbo,一個 Dubbo 接口從請求到傳回為 10 ms,那麼 Map 算子處理能力 1000 Record / S (1000 ms / 10 ms * 10 ),進而實時任務處理能力會下降為 1000 Record / S。

由于一條消息記錄的處理會在一個 Task 内部流轉,是以我們試圖找出一個實時任務中,處理最慢的 Task 邏輯。如果 Source 端到 Sink 端全部 Chain 起來的話,我們則是會找出處理最慢的 Operator 的邏輯。在源碼層,我們針對 Flink Task 以及 Operator 增加了單條記錄處理時間的自定義 Metric,之後該 Metric 可以通過 Flink Rest API 擷取。我們會周遊一個 Flink 任務中所有的 Task , 查詢處理最慢的 Task 所在的 JobVertex(JobGraph 的點),然後擷取到該 JobVertex 所有 Task 的總輸出,最終會和 Kafka Topic 機關時間輸入進行比對,判斷實時任務消息處理能力是否合理。

設實時任務 Kafka Topic 機關時間的輸入為 S,處理最慢的 Task 代表的 JobVertex 的并發度為 P,處理最慢的 Task 所在的 JobVertex 機關時間輸出為 O,處理最慢的 Task 的最大消息處理時間為 T,那麼通過下面邏輯進行分析:

  1. 當 O 約等于 S,且 1 second / T * P 遠大于 S 時,會考慮減小任務并發度。
  2. 當 O 約等于 S,且 1 second / T * P 約等于 S 時,不考慮調整任務并發度。
  3. 當 O 遠小于 S,且 1 second / T * P 遠小于 S 時,會考慮增加任務并發度。

目前主要是 1 這種情況在 CPU 使用方面不合理,當然,由于不同時間段,實時任務的流量不同,是以我們會有一個周期性檢測的的任務,如果檢測到某個實時任務連續多次都符合 1 這種情況時,會自動報警提示平台管理者進行資源優化調整。

下圖是從 Flink 任務的記憶體以及消息處理能力兩個視角分析資源邏輯圖:

有贊 Flink 實時任務資源優化探索與實踐

二、從記憶體視角對 Flink 分析實踐

2.1 Flink 任務垃圾回收器選擇

Flink 任務本質還是一個 Java 任務,是以也就會涉及到垃圾回收器的選擇。選擇垃圾回收器一般需要從兩個角度進行參考:

  1. 吞吐量,即機關時間内,任務執行時間 / (任務執行時間 + 垃圾回收時間),當然并不是說降低 GC 停頓時間就能提升吞吐量,因為降低 GC 停頓時間,你的 GC 次數也會上升。
  2. 延遲。如果你的 Java 程式涉及到與外部互動,延遲會影響外部的請求使用體驗。

Flink 任務我認為還是偏重吞吐量的一類 Java 任務,是以會從吞吐量角度進行更多的考量。當然并不是說完全不考慮延遲,畢竟 JobManager、TaskManager、ResourceManager 之間存在心跳,延遲過大,可能會有心跳逾時的可能性。

目前我們 JDK 版本為内部 JDK 1.8 版本,新生代垃圾回收器使用 Parallel Scavenge,那麼老年代垃圾回收器隻能從 Serial Old 或者 Parallel Old 中選擇。由于我們 Flink k8s 任務每個 Pod 的 CPU 限制為 0.6 - 1 core ,最大也隻能使用 1 個 core,是以老年代的垃圾回收器我們使用的是 Serial Old ,多線程垃圾回收在單 Core 之間,可能會有線程切換的消耗。

2.2 實時任務 GC 日志擷取

設定完垃圾回收器後,下一步就是擷取 Flink 任務的 GC 日志。Flink 任務構成一般是單個 JobManager + 多個 TaskManger ,這裡需要擷取到 TaskManager 的 GC 日志進行分析。那是不是要對所有 TaskManager 進行擷取呢。這裡我們按照 TaskManager 的 Young GC 次數,按照次數大小進行排序,取排名前 16 的 TaskManager 進行分析。YoungGC 次數可以通過 Flink Rest API 進行擷取。

Flink on Yarn 實時任務的 GC 日志,直接點開 TaskManager 的日志連結就能夠看到,然後通過 HTTP 通路,就能下載下傳到本地。Flink On k8s 任務的 GC 日志,會先寫到 Pod 所挂載的雲盤,基于 k8s hostpath volume 進行挂載。我們内部使用 Filebeat 進行日志檔案變更監聽和采集,最終輸出到下遊的 Kafka Topic。我們内部會有自定義日志服務端,它會消費 Kafka 的日志記錄,自動進行落盤和管理,同時向外提供日志下載下傳接口。通過日志下載下傳的接口,便能夠下載下傳到需要分析的 TaskManager 的 GC 日志。

2.3 基于 GC Viewer 分析 Flink 任務記憶體

GC Viewer 是一個開源的 GC 日志分析工具。使用 GC Viewer 之前,需要先把 GC Viewer 項目代碼 clone 到本地,然後進行編譯打包,就可以使用其功能。

有贊 Flink 實時任務資源優化探索與實踐

在對一個實時任務堆記憶體進行分析時,先把 Flink TaskManager 的日志下載下傳到本地,然後通過 GC Viewer 對日志進行。如果你覺得多個 Taskmanager GC 日志分析較慢時,可以使用多線程。上面所有這些操作,可以将其代碼化,自動化産出分析結果。下面是通過 GC Viewer 分析的指令行:

java -jar gcviewer-1.37-SNAPSHOT.jar gc.log summary.csv           

上面參數 gc.log 表示一個 Taskmanager 的 GC 日志檔案名稱,summary.csv 表示日志分析的結果。下面是我們平台對于某個實時任務記憶體分析的結果:

有贊 Flink 實時任務資源優化探索與實踐

下面是上面截圖中,部分參數說明:

  1. RunHours,Flink 任務運作小時數
  2. YGSize,一個 TaskManager 新生代堆記憶體最大配置設定量,機關兆
  3. YGUsePC,一個 TaskManager 新生代堆最大使用率
  4. OGSize,一個 TaskManager 老年代堆記憶體最大配置設定量,機關兆
  5. OGUsePC,一個 TaskManager 老生代堆最大使用率
  6. YGCoun,一個 TaskMnager Young GC 次數
  7. YGPerTime,一個 TaskMnager Young GC 每次停頓時間,機關秒
  8. FGCount,一個 TaskMnager Full GC 次數
  9. FGAllTime,一個 TaskMnager Full GC 總時間,機關秒
  10. Throught,Task Manager 吞吐量
  11. AVG PT(分析結果 avgPromotion 參數),平均每次 Young GC 晉升到老年代的對象大小
  12. Rec Heap,推薦的堆大小
  13. RecNewHeap,推薦的新生代堆大小
  14. RecOldHeap,推薦的老年代堆大小

上述大部分記憶體分析結果,通過 GC Viewer 分析都能得到,不過推薦堆大小、推薦新生代堆大小、推薦老年代堆大小則是根據 1.2.1 小節的記憶體優化規則來設定。

三、從消息處理視角對 Flink 分析實踐

3.1 實時任務 Kafka Topic 機關時間輸入擷取

想要對 Flink 任務的消息處理能力進行分析,第一步便是擷取該實時任務的 Kafka 資料源 Topic,目前如果資料源不是 Kafka 的話,我們不會進行分析。Flink 任務總體分為兩類:Flink Jar 任務和 Flink SQL 任務。Flink SQL 任務擷取 Kafka 資料源比較簡單,直接解析 Flink SQL 代碼,然後擷取到 With 後面的參數,在過濾掉 Sink 表之後,如果 SQLCreateTable 的 Conector 類型為 Kafka,就能夠通過 SQLCreateTable with 後的參數,拿到具體 Kafka Topic。

Flink Jar 任務的 Kafka Topic 資料源擷取相對繁瑣一些,我們内部有一個實時任務血緣解析服務,通過對 Flink Jar 任務自動建構其 PackagedProgram,PackagedProgram 是 Flink 内部的一個類,然後通過 PackagedProgram ,我們可以擷取一個 Flink Jar 任務的 StreamGraph,StreamGraph 裡面有 Source 和 Sink 的所有 StreamNode,通過反射,我們可以擷取 StreamNode 裡面具體的 Source Function,如果是 Kafka Source Sunction,我們就會擷取其 Kafka Topic。下面是 StreamGraph 類截圖:

有贊 Flink 實時任務資源優化探索與實踐

擷取到 Flink 任務的 Kafka Topic 資料源之後,下一步便是擷取該 Topic 機關時間輸入的消息記錄數,這裡可以通過 Kafka Broker JMX Metric 接口擷取,我們則是通過内部 Kafka 管理平台提供的外部接口進行擷取。

3.2 自動化檢測 Flink 消息處理最慢 Task

首先,我們在源碼層增加了 Flink Task 單條記錄處理時間的 Metric,這個 Metric 可以通過 Flink Rest API 擷取。接下來就是借助 Flink Rest API,周遊要分析的 Flink 任務的所有的 Task。Flink Rest Api 有這樣一個接口:

base_flink_web_ui_url/jobs/:jobid           

這個接口能夠擷取一個任務的所有 Vertexs,一個 Vertex 可以簡單了解為 Flink 任務 JobGraph 裡面的一個 JobVertex。JobVertex 代表着實時任務中一段執行邏輯。

有贊 Flink 實時任務資源優化探索與實踐

擷取完 Flink 任務所有的 Vertex 之後,接下來就是擷取每個 Vertex 具體 Task 處理單條記錄的 metric,可以使用下面的接口:

有贊 Flink 實時任務資源優化探索與實踐

需要在上述 Rest API 連結 metrics 之後添加 ?get=(具體meitric ),比如:metrics?get=0.Filter.numRecordsOut,0 表示該 Vertex Task 的 id,Filter.numRecordsOut 則表示具體的名額名稱。我們内部使用 taskOneRecordDealTime 表示Task 處理單條記錄時間 Metric,然後用 0.taskOneRecordDealTime 去擷取某個 Task 的單條記錄處理時間的名額。上面接口支援多個名額查詢,即 get 後面使用逗号隔開即可。

最終自動化檢測 Flink 消息處理最慢 Task 整體步驟如下:

  1. 擷取一個實時任務所有的 Vertexs
  2. 周遊每個 Vertex,然後擷取這個 Vertex 所有并發度 Task 的 taskOneRecordDealTime,并且記錄其最大值
  3. 所有 Vertex 單條記錄處理 Metric 最大值進行對比,找出處理時間最慢的 Vertex。

下面是我們實時平台對于一個 Flink 實時任務分析的結果:

有贊 Flink 實時任務資源優化探索與實踐

四、有贊 Flink 實時任務資源優化實踐

既然 Flink 任務的記憶體以及消息處理能力分析的方式已經有了,那接下來就是在實時平台端進行具體實踐。我們實時平台每天會定時掃描所有正在運作的 Flink 任務,在任務記憶體方面,我們能夠結合 實時任務 GC 日志,同時根據記憶體優化規則,計算出 Flink 任務推薦的堆記憶體大小,并與實際配置設定的 Flink 任務的堆記憶體進行比較,如果兩者相差的倍數過大時,我們認為 Flink 任務的記憶體配置存在浪費的情況,接下來我們會報警提示到平台管理者進行優化。

平台管理者再收到報警提示後,同時也會判定實時任務消息能力是否合理,如果消息處理最慢的 Vertex (某段實時邏輯),其所有 Task 機關時間處理消息記錄數的總和約等于實時任務消費的 Kafka Topic 機關時間的輸入,但通過 Vertex 的并發度,以及單條消息處理 Metric ,算出該 Vertex 機關時間處理的消息記錄數遠大于 Kafka Topic 的機關輸入時,則認為 Flink 任務可以适當調小并發度。具體調整多少,會和業務方溝通之後,在進行調整。整體 Flink 任務資源優化操作流程如下:

有贊 Flink 實時任務資源優化探索與實踐

五、總結

目前有贊實時計算平台對于 Flink 任務資源優化探索已經走出第一步。通過自動化發現能夠優化的實時任務,然後平台管理者介入分析,最終判斷是否能夠調整 Flink 任務的資源。在整個實時任務資源優化的鍊路中,目前還是不夠自動化,因為在後半段還需要人為因素。未來我們計劃 Flink 任務資源的優化全部自動化,會結合實時任務曆史不同時段的資源使用情況,自動化推測和調整實時任務的資源配置,進而達到提升整個實時叢集資源使用率的目的。

同時未來也會和中繼資料平台的同學進行合作,一起從更多方面來分析實時任務是否存在資源優化的可能性,他們在原來離線任務資源方面積攢了很多優化經驗,未來也可以參考和借鑒,應用到實時任務資源的優化中。

當然,最理想化就是實時任務的資源使用能夠自己自動彈性擴縮容,之前聽到過社群同學有這方面的聲音,同時也歡迎你能夠和我一起探讨。