天天看點

Flink性能調優小小總結

點選上方藍色字型,選擇“設為星标”

回複”資源“擷取更多資源

Flink性能調優小小總結

1 配置記憶體

操作場景

Flink是依賴記憶體計算,計算過程中記憶體不夠對Flink的執行效率影響很大。可以通過監控GC(Garbage Collection),評估記憶體使用及剩餘情況來判斷記憶體是否變成性能瓶頸,并根據情況優化。

監控節點程序的YARN的Container GC日志,如果頻繁出現Full GC,需要優化GC。

GC的配置:在用戶端的"conf/flink-conf.yaml"配置檔案中,在“env.java.opts”配置項中添加參數:

-Xloggc:<LOG_DIR>/gc.log 
-XX:+PrintGCDetails 
-XX:-OmitStackTraceInFastThrow 
-XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps 
-XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=20 
-XX:GCLogFileSize=20M

           

此處預設已經添加GC日志。

操作步驟

  • 優化GC

調整老年代和新生代的比值。在用戶端的“conf/flink-conf.yaml”配置檔案中,在“env.java.opts”配置項中添加參數:“-XX:NewRatio”。如“ -XX:NewRatio=2”,則表示老年代與新生代的比值為2:1,新生代占整個堆空間的1/3,老年代占2/3。

  • 開發Flink應用程式時,優化DataStream的資料分區或分組操作。

當分區導緻資料傾斜時,需要考慮優化分區。避免非并行度操作,有些對DataStream的操作會導緻無法并行,例如WindowAll。keyBy盡量不要使用String。

2 設定并行度

并行度控制任務的數量,影響操作後資料被切分成的塊數。調整并行度讓任務的數量和每個任務處理的資料與機器的處理能力達到最優。檢視CPU使用情況和記憶體占用情況,當任務和資料不是平均分布在各節點,而是集中在個别節點時,可以增大并行度使任務和資料更均勻的分布在各個節點。增加任務的并行度,充分利用叢集機器的計算能力,一般并行度設定為叢集CPU核數總和的2-3倍。

任務的并行度可以通過以下四種層次(按優先級從高到低排列)指定,使用者可以根據實際的記憶體、CPU、資料以及應用程式邏輯的情況調整并行度參數。

  • 算子層次

一個算子、資料源和sink的并行度可以通過調用setParallelism()方法來指定,例如

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");

           
  • 執行環境層次

Flink程式運作在執行環境中。執行環境為所有執行的算子、資料源、data sink定義了一個預設的并行度。

執行環境的預設并行度可以通過調用setParallelism()方法指定。例如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = [...]
    wordCounts.print();
    env.execute("Word Count Example");

           
  • 用戶端層次

并行度可以在用戶端将job送出到Flink時設定。對于CLI用戶端,可以通過“-p”參數指定并行度。例如:./bin/flink run -p 10 ../examples/WordCount-java.jar

  • 系統層次

在系統級可以通過修改Flink用戶端conf目錄下的“flink-conf.yaml”檔案中的“parallelism.default”配置選項來指定所有執行環境的預設并行度。

3.配置程序參數

Flink on YARN模式下,有JobManager和TaskManager兩種程序。在任務排程和運作的過程中,JobManager和TaskManager承擔了很大的責任。

因而JobManager和TaskManager的參數配置對Flink應用的執行有着很大的影響意義。使用者可通過如下操作對Flink叢集性能做優化。

1.配置JobManager記憶體

JobManager負責任務的排程,以及TaskManager、RM之間的消息通信。當任務數變多,任務平行度增大時,JobManager記憶體都需要相應增大。您可以根據實際任務數量的多少,為JobManager設定一個合适的記憶體。

  • 在使用yarn-session指令時,添加“-jm MEM”參數設定記憶體。
  • 在使用yarn-cluster指令時,添加“-yjm MEM”參數設定記憶體。

2.配置TaskManager個數

每個TaskManager每個核同時能跑一個task,是以增加了TaskManager的個數相當于增大了任務的并發度。在資源充足的情況下,可以相應增加TaskManager的個數,以提高運作效率。

  • 在使用yarn-session指令時,添加“-n NUM”參數設定TaskManager個數。
  • 在使用yarn-cluster指令時,添加“-yn NUM”參數設定TaskManager個數。

3.配置TaskManager Slot數

每個TaskManager多個核同時能跑多個task,相當于增大了任務的并發度。但是由于所有核共用TaskManager的記憶體,是以要在記憶體和核數之間做好平衡。

  • 在使用yarn-session指令時,添加“-s NUM”參數設定SLOT數。
  • 在使用yarn-cluster指令時,添加“-ys NUM”參數設定SLOT數。

4.配置TaskManager記憶體

TaskManager的記憶體主要用于任務執行、通信等。當一個任務很大的時候,可能需要較多資源,因而記憶體也可以做相應的增加。

  • 将在使用yarn-sesion指令時,添加“-tm MEM”參數設定記憶體。
  • 将在使用yarn-cluster指令時,添加“-ytm MEM”參數設定記憶體。

設計分區方法

合理的設計分區依據,可以優化task的切分。在程式編寫過程中要盡量分區均勻,這樣可以實作每個task資料不傾斜,防止由于某個task的執行時間過長導緻整個任務執行緩慢。

以下是幾種分區方法

  • 随機分區:将元素随機地進行分區。dataStream.shuffle();
  • Rebalancing (Round-robin partitioning):基于round-robin對元素進行分區,使得每個分區負責均衡。對于存在資料傾斜的性能優化是很有用的。dataStream.rebalance();
  • Rescaling:以round-robin的形式将元素分區到下遊操作的子集中。如果你想要将資料從一個源的每個并行執行個體中散發到一些mappers的子集中,用來分散負載,但是又不想要完全rebalance 介入(引入rebalance()),這會非常有用。dataStream.rescale();
  • 廣播:廣播每個元素到所有分區。dataStream.broadcast();
  • 自定義分區:使用一個使用者自定義的Partitioner對每一個元素選擇目标task,由于使用者對自己的資料更加熟悉,可以按照某個特征進行分區,進而優化任務執行。簡單示例如下所示:
// fromElements構造簡單的Tuple2流
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
// 定義用于分區的key值,傳回即屬于哪個partition的,該值加1就是對應的子任務的id号
Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {
    @Override
    public int partition(Tuple2<String, Integer> key, int numPartitions) {
        return (key.f0.length() + key.f1) % numPartitions;
    }
};
// 使用Tuple2進行分區的key值
dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
        return value;
    }
}).print();

           

配置netty網絡通信

Flink通信主要依賴netty網絡,是以在Flink應用執行過程中,netty的設定尤為重要,網絡通信的好壞直接決定着資料交換的速度以及任務執行的效率。

以下配置均可在用戶端的“conf/flink-conf.yaml”配置檔案中進行修改适配,預設已經是相對較優解,請謹慎修改,防止性能下降。

•“taskmanager.network.netty.num-arenas”:預設是“taskmanager.numberOfTaskSlots”,表示netty的域的數量。 

•“taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:預設是“taskmanager.numberOfTaskSlots”,表示netty的用戶端和服務端的線程數目設定。 

•“taskmanager.network.netty.client.connectTimeoutSec”:預設是120s,表示taskmanager的用戶端連接配接逾時的時間。

  • “taskmanager.network.netty.sendReceiveBufferSize”:預設是系統緩沖區大小(cat /proc/sys/net/ipv4/tcp _ [rw]mem) ,一般為4MB,表示netty的發送和接收的緩沖區大小。
  • “taskmanager.network.netty.transport”:預設為“nio”方式,表示netty的傳輸方式,有“nio”和“epoll”兩種方式。

解決資料傾斜

當資料發生傾斜(某一部分資料量特别大),雖然沒有GC(Gabage Collection,垃圾回收),但是task執行時間嚴重不一緻。

  • 需要重新設計key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 調用rebalance操作,使資料分區均勻。

緩沖區逾時設定

  • 由于task在執行過程中存在資料通過網絡進行交換,資料在不同伺服器之間傳遞的緩沖區逾時時間可以通過setBufferTimeout進行設定。
  • 當設定“setBufferTimeout(-1)”,會等待緩沖區滿之後才會重新整理,使其達到最大吞吐量;當設定“setBufferTimeout(0)”時,可以最小化延遲,資料一旦接收到就會重新整理;當設定“setBufferTimeout”大于0時,緩沖區會在該時間之後逾時,然後進行緩沖區的重新整理。示例可以參考如下:env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

Checkpoint 調優

1.什麼是 checkpoint 簡單地說就是 Flink 為了達到容錯和 exactly-once 語義的功能,定期把 state 持久化下來,而這一持久化的過程就叫做 checkpoint ,它是 Flink Job 在某一時刻全局狀态的快照。

當我們要對分布式系統實作一個全局狀态保留的功能時,傳統方案會引入一個統一時鐘,通過分布式系統中的 master 節點廣播出去給每一個 slaves 節點,當節點接收到這個統一時鐘時,它們就記錄下自己目前的狀态即可。

Flink性能調優小小總結

但是統一時鐘的方式也存在一定的問題,某一個 node 進行的 GC 時間比較長,或者 master 與 slaves 的網絡在當時存在波動而造成時鐘的發送延遲或者發送失敗,都會造成此 slave 和其它的機器出現資料不一緻而最終導緻腦裂的情況。如果我們想要解決這個問題,就需要對 master 和 slaves 做一個 HA(High Availability)。但是,一個系統越是複雜,就越不穩定且維護成本越高。

Flink 是将 checkpoint 都放進了一個名為 Barrier 的流。

Flink性能調優小小總結

上圖中就是一個 Barrier 的例子,從上遊的第一個 Task 到下遊的最後一個 Task,每次當 Task 經過圖中藍色的栅欄時,就會觸發 save snapshot(快照)的功能。我們用一個例子來簡單說明。

2.執行個體分析

Flink性能調優小小總結

這是一個簡單的 ETL 過程,首先我們把資料從 Kafka 中拿過來進行一個 trans 的轉換操作,然後再發送到一個下遊的 Kafka。

此時這個例子中沒有進行 chaining 的調優。是以此時采用的是 forward strategy ,也就是 “一個 task 的輸出隻發送給一個 task 作為輸入”,這樣的方式,這樣做也有一個好處就是如果兩個 task 都在一個 JVM 中的話,那麼就可以避免不必要的網絡開銷。

設定 Parallism 為 2,此時的 DAG 圖如下:

Flink性能調優小小總結

■ CK的分析過程

Flink性能調優小小總結

5 CK 的分析過程

Flink性能調優小小總結

每一個 Flink 作業都會有一個 JobManager ,JobManager 裡面又會有一個 checkpoint coordinator 來管理整個 checkpoint 的過程,我們可以設定一個時間間隔讓 checkpoint coordinator 将一個 checkpoint 的事件發送給每一個 Container 中的 source task,也就是第一個任務(對應并行圖中的 task1,task2)。

當某個 Source 算子收到一個 Barrier 時,它會暫停自身的資料處理,然後将自己的目前 state 制作成 snapshot(快照),并儲存到指定的持久化存儲中,最後向 CheckpointCoordinator 異步發送一個 ack(Acknowledge character --- 确認字元),同時向自身所有下遊算子廣播該 Barrier 後恢複自身的資料處理。

每個算子按照上面不斷制作 snapshot 并向下遊廣播,直到最後 Barrier 傳遞到 sink 算子,此時快照便制作完成。這時候需要注意的是,上遊算子可能是多個資料源,對應多個 Barrier 需要全部到齊才一次性觸發 checkpoint ,是以在遇到 checkpoint 時間較長的情況時,有可能是因為資料對齊需要耗費的時間比較長所造成的。

■ Snapshot & Recover

Flink性能調優小小總結

如圖,這是我們的Container容器初始化的階段,e1 和 e2 是剛從 Kafka 消費過來的資料,與此同時,CheckpointCoordinator 也往它發送了 Barrier。

此時 Task1 完成了它的 checkpoint 過程,效果就是記錄下 offset 為2(e1,e2),然後把 Barrier 往下遊的算子廣播,Task3 的輸入為 Task1 的輸出,現在假設我的這個程式的功能是統計資料的條數,此時 Task3 的 checkpoint 效果就是就記錄資料數為2(因為從 Task1 過來的資料就是 e1 和 e2 兩條),之後再将 Barrier 往下廣播,當此 Barrier 傳遞到 sink 算子,snapshot 就算是制作完成了。

此時 source 中還會源源不斷的産生資料,并産生新的 checkpoint ,但是此時如果 Container 當機重新開機就需要進行資料的恢複了。剛剛完成的 checkpoint 中 offset為2,count為2,那我們就按照這個 state 進行恢複。此時 Task1 會從 e3 開始消費,這就是 Recover 操作。

■ checkpoint 的注意事項

Flink性能調優小小總結

下面列舉的3個注意要點都會影響到系統的吞吐,在實際開發過程中需要注意:

一下内容為Flink中文社群的總結,供大家參考:

Flink 作業的問題定位

1.問題定位口訣

“一壓二查三名額,延遲吞吐是核心。時刻關注資源量 , 排查首先看GC。”

一壓是指背壓,遇到問題先看背壓的情況,二查就是指 checkpoint ,對齊資料的時間是否很長,state 是否很大,這些都是和系統吞吐密切相關的,三名額就是指 Flink UI 那塊的一些展示,我們的主要關注點其實就是延遲和吞吐,系統資源,還有就是 GC logs。

  • 看反壓 :通常最後一個被壓高的 subTask 的下遊就是 job 的瓶頸之一。
  • 看 Checkpoint 時長 :Checkpoint 時長能在一定程度影響 job 的整體吞吐。
  • 看核心名額 :名額是對一個任務性能精準判斷的依據,延遲名額和吞吐則是其中最為關鍵的名額。
  • 資源的使用率:提高資源的使用率是最終的目的。

■ 常見的性能問題

Flink性能調優小小總結

簡單解釋一下:

  1. 在關注背壓的時候大家往往忽略了資料的序列化和反序列化,過程所造成的性能問題。
  2. 一些資料結構 ,比如 HashMap 和 HashSet 這種 key 需要經過 hash 計算的資料結構,在資料量大的時候使用 keyby 進行操作, 造成的性能影響是非常大的。
  3. 資料傾斜 是我們的經典問題,後面再進行展開。
  4. 如果我們的下遊是 MySQL,HBase這種,我們都會進行一個批處理的操作,就是讓資料存儲到一個 buffer 裡面,在達到某些條件的時候再進行發送,這樣做的目的就是減少和外部5. 系統的互動,降低 網絡開銷 的成本。
  5. 頻繁GC ,無論是 CMS 也好,G1也好,在進行 GC 的時候,都會停止整個作業的運作,GC 時間較長還會導緻 JobManager 和 TaskManager 沒有辦法準時發送心跳,此時 JobManager 就會認為此 TaskManager 失聯,它就會另外開啟一個新的 TaskManager
  6. 視窗是一種可以把無限資料切割為有限資料塊的手段。比如我們知道,使用滑動視窗的時候資料的重疊問題,size = 5min 雖然不屬于大視窗的範疇,可是 step = 1s 代表1秒就要進行一次資料的處理,這樣就會造成資料的重疊很高,資料量很大的問題。

2.Flink 作業調優

Flink性能調優小小總結
Flink性能調優小小總結

我們可以通過一些資料結構,比如 Set 或者 Map 來結合 Flink state 進行去重。但是這些去重方案會随着資料量不斷增大,進而導緻性能的急劇下降,比如剛剛我們分析過的 hash 沖突帶來的寫入性能問題,記憶體過大導緻的 GC 問題,TaskManger 的失聯問題。

Flink性能調優小小總結
Flink性能調優小小總結
Flink性能調優小小總結
Flink性能調優小小總結

方案二和方案三也都是通過一些資料結構的手段去進行去重,有興趣的同學可以自行下去了解,在這裡不再展開。

■ 資料傾斜

Flink性能調優小小總結

資料傾斜是大家都會遇到的高頻問題,解決的方案也不少。

第一種場景是當我們的并發度設定的比分區數要低時,就會造成上面所說的消費不均勻的情況。

Flink性能調優小小總結

第二種提到的就是 key 分布不均勻的情況,可以通過添加随機字首打散它們的分布,使得資料不會集中在幾個 Task 中。

在每個節點本地對相同的 key 進行一次聚合操作,類似于 MapReduce 中的本地 combiner。map-side 預聚合之後,每個節點本地就隻會有一條相同的 key,因為多條相同的 key 都被聚合起來了。其他節點在拉取所有節點上的相同 key 時,就會大大減少需要拉取的資料數量,進而也就減少了磁盤 IO 以及網絡傳輸開銷。

■ 記憶體調優

Flink性能調優小小總結

Flink 的記憶體結構剛剛我們已經提及到了,是以我們清楚,調優的方面主要是針對 非堆記憶體 Network buffer ,manager pool 和堆記憶體的調優,這些基本都是通過參數來進行控制的。

這些參數我們都需要結合自身的情況去進行調整,這裡隻給出一些建議。而且對于 ManagerBuffer 來說,Flink 的流式作業現在并沒有過多使用到這部分的記憶體,是以我們都會設定得比較小,不超過0.3。

Flink性能調優小小總結

堆記憶體的調優是關于 JVM 方面的,主要就是将預設使用的垃圾回收器改為 G1 ,因為預設使用的 Parallel Scavenge 對于老年代的 GC 存在一個串行化的問題,它的 Full GC 耗時較長,下面是關于 G1 的一些介紹,網上資料也非常多,這裡就不展開說明了。

Flink性能調優小小總結
Flink性能調優小小總結
Flink性能調優小小總結
Flink性能調優小小總結

ClickHouse存儲計算分離在騰訊雲的實踐

Spark Streaming + Canal + Kafka打造Mysql增量資料實時進行監測分析

impala + kudu | 大資料實時計算踩坑優化指南

繼續閱讀