1.簡介
随公司業務的發展,對實時計算的需求也越來越多,目前Flink已廣泛應用于實時ETL,實時數倉、特征工程和線上資料服務等業務場景。
本文首先簡單介紹了Flink實時計算基本概念,然後引出基于Flink進行實時計算開發過程當中碰到的一些問題,通過對這些常見實時計算問題的産生原因進行深入分析,進而給出相應問題的解決方案。
2.Flink基本概念
Flink是一個分布式大資料處理引擎和架構,可對有無界和有界資料流進行有狀态的計算,能夠部署在各種叢集環境中,對各種規模大小的資料進行快速計算。
在進行Flink應用開發前,我們先了解流treams、狀态State、時間Time ,Watermark等Flink基礎語義概念以及Flink 兼顧靈活性和友善性的多層次API。
2.1 流(Stream)
Stream分為有限資料流與無限資料流,unbounded stream 是有始無終的資料流,即無限資料流;而bounded stream 是限定大小的有始有終的資料集合,即有限資料流,二者的差別在于無限資料流的資料會随時間的推演而持續增加,計算持續進行且不存在結束的狀态,相對的有限資料流資料大小固定,計算最終會完成并處于結束的狀态。
2.2 狀态(State)
狀态是計算過程中的産生的計算結果,在容錯恢複和Checkpoint中有重要的作用。流計算在本質上是Incremental Processing,是以需要将計算的中間結果儲存到狀态中,例如按天統計每個區域的GMV,是以當訂單資料到達時,需要将前面計算的該訂單所屬區域的GMV資料從狀态中讀取出來,然後再與該訂單額進行累加,累加完後将結果再次更新到狀态中;Flink正是通過将計算中間結果儲存到狀态後端中,才保證在整個分布式系統運作失敗或者挂掉的情況下做到Exactly-once,增強容災的效果。圖2-1是算子在聚合計算時,讀寫state狀态示意圖。
圖2-1算子讀寫狀态示意圖
2.3 時間語義(Time)
Flink的時間語義分為Event time、Ingestion time和Processing time。Flink的無限資料流是一個持續的過程,時間是我們判斷業務狀态是否滞後,資料處理是否及時的重要依據。
圖2-2 Flink時間語義
Event Time是在事件生産端産生該事件的時間,例如使用者下的訂單,訂單的下單時間就是事件産生的時間Event Time。
Ingestion time是事件進入Flink的時間。
Processing time是事件被處理時,目前的系統時間。
2.4 Watermark
流處理從事件産生,到流經source,再到operator算子,中間是有一個過程和時間的,雖然大部分情況下,流到operator算子的資料都是按照事件産生的時間順序來的,但是也不排除由于網絡、分布式等原因,導緻亂序的産生。
是以一旦出現亂序,如果隻根據Event Time來決定window的運作,不能保證在視窗關閉之前,所有的資料都能到達,但我們又不能無限期的等待下去,此時我們需要有一個機制來儲存在一個特定時間後,必須觸發視窗計算,這一個機制就是watermark。Watermark特點如下:
Watermark是一種衡量Event Time進展的機制。
Watermark是用于處理亂序事件的, 它與window相結合,可以正确處理亂序事件。
watermark是單調遞增的。watermark = Math.max(eventTime - dalayTime ,watermark), 一旦watermark大于等于某個視窗的end_time時,就會觸發視窗的計算,是以Watermark就是用觸發視窗計算的。圖2-3是無序資料流中的watermark示例,它的最大亂序時間為2。
圖2-3 無序資料的watermark
Watermark具有傳遞性。如果一個算子上遊有多個算子,則該算子的watermark值為取上遊所有流入該算子的watermark的最小值。圖2-4為watermark的傳遞性圖。
圖2-4 Watermark的傳遞性
2.5 API層
API 通常分為三層,由上而下可分為SQL 、Table API、DataStream API、ProcessFunction三層,API的表達能力及業務抽象能力都非常強大,但越接近SQL層,表達能力會逐漸減弱,抽象能力會增強,反之,ProcessFunction 層API的表達能力非常強,可以進行多種靈活友善的操作,但抽象能力也相對越小。圖2-5為Flink API的層結構圖。
圖2-5 Flink API層結構圖
3.Flink實時計算常見問題分析
3.1 資料亂序問題分析
資料亂序是指Flink在使用Event Time處理流式資料時,由于分布式或網絡原因,導緻資料到達處理機制進行處理時并不是按照資料産生的時間先後順序到達的。導緻資料亂序的場景如下:
1、kafka多分區導緻資料亂序
圖3-1中的數字表示資料産生的時間順序,資料生産端按資料産生時間将資料推送到kafka,因由kafka存在多個分區,在kafka的預設分發機制下,資料到流動如圖3-1所示:
圖3-1 Kafka多分區亂序圖
圖3-1所示,假如Flink的source的并行度與kafka的分區一緻,資料經過Flink的source後,再經過keyby分區開窗後,資料就會存在亂序情況,例如key為0的分區算子,視窗接受到的資料順序是:1,11,26,2。
2、由于網絡延遲導緻資料亂序
當存在多個資料生産端時,每一個生産端都将産生的資料,通過網絡傳輸将資料發送到kafka,這裡為了說明網絡延遲,将kafka的分區設定為1。由于每一個生産端到kafka端走的網絡路徑的不同,可能存在有一些存在鍊路阻塞,長短等情況,進而導緻資料到kafak的時間并不是按照資料産生的時間順序到達的。圖3-2為網絡原因導緻資料亂序的示意圖。
圖3-2 網絡延遲導緻資料亂序圖
3.2 Flink大狀态場景及問題分析
Flink大狀态形成的主是因為flink在計算過程中,存在大量的計算中間結果。那麼在那些場景下會使Flink的狀态比較大,目前在我們實踐中主要有下面幾種場景:
1、資料量大且開窗的時間比較長,例如開1天,1周甚至1個月的視窗。這種開窗時間比較長,要緩存的資料就比較大,是以狀态也會比較大。例如按天統計UV資料。
2、資料量比較大的二條流join時互相等待的時間比較長,例如訂單資料與物流的配送資料做join得到訂單的物流狀态,這種情況下,訂單資料可能要等待物流配送資料最大時間要1天左右。這也會導緻二條流的狀态比較大。
3、Group By分組的key的資料量大,計算的名額項非常多或計算的步驟多複雜度高,這種情況會導緻要儲存大量的中間計算結果,進而導緻狀态比較大。
圖3-3是一個後端曝光流與前端使用者點選流join場景下checkpoint資訊圖,在二條流的資料互相等待30分鐘的情況下,checkpoint就已經比較大了,如果等待時間更長,狀态會更大。
圖3-3 Flink checkpoint圖
在狀态比較大的場景下,可能存在以下問題:
1、Flink的堆外記憶體超額使用,導緻Yarn将Task Manager(以下簡稱TM)的容器kill掉,導緻任務Failover。這種情況主要是Flink對RocksDB的使用記憶體沒有限制,當狀态越來越大的時,堆外記憶體使用量就會超額使用,一旦TM的記憶體超過Yarn配置設定給TM所在容器的記憶體,就會導緻Yarn将容器kill掉。這種情況我們可以通過Yarn的日志分析得到原因。
2、Checkpoint狀态大,Checkpoint時間長。這種情況,我們可以首先從Sub Task checkpoint資訊入手。如圖3-4所示:
圖3-4 subTask checkpoint詳情圖
先看checkpoint Data Size大小,如果狀态很大,再看End to End Duration時間,如果時間比較大,比如做一次checkpoint要花十幾甚至幾十分鐘的話,我們接着再看具體的sub Task checkpoint的時間耗時情況。
1、Checkpoint時sub Task的Latest Acknowledgement的狀态一直為n/a
這種情況下要去分析是任務是否存在背壓的情況。我們可以通過Flink Web UI檢視算子merics的inPoolUsage和outPoolUsage的使用率。如果outPoolUsage的使用低而inPoolUsage使用率高,說明背壓是由該算子産生的。如果inPoolUsage使用率低,而outPoolUsage使用率很高則說明是下遊算子計算處理不及時導緻上遊算子背壓。繼續按上面的方法排查,最終找到産生背壓的算子
圖3-5 算子輸入輸出緩沖區使用率圖
2、關注Sync Duration和Async Duration時間
◇Sync Duration時間很長,說明磁盤IO可能成為性能瓶頸
◇Async Duration時間很長,說明是網絡IO可能成為性能瓶頸或是資料同步線程太少
3.3 資料傾斜問題
資料傾斜由于資料分布不均勻,造成資料大量的集中到一點,造成資料熱點。在這種Flink實時計算場景中,我們會發現在某個算子的處理上,一些sub task處理的資料量很大,有一些sub task處理的資料量很小,從flink的叢集界面上,我們也可以看到算子的背壓情況,在這種場景下,我們通過增加算子的并行度,并不能改善算子的背壓情況。例如PV的場景中,有一些網頁的點選量很高,而有一些網頁的點量很少,進而導緻某些sub task背壓嚴重。
圖3-6 資料傾斜示意圖
4.實時計算常見問題的解決方案
4.1 資料亂序場景的處理
Flink處理資料亂序的方式有三種:
1、Watermark
我們在設定watermark時,可以設定一個最大的亂序時間,而watermark是以事件時間減去所允許的最大亂序時間作為watermark,是以相當于多給了資料一定的時間,然後關閉視窗,觸發計算。
2、允許遲到(allowedLateness)
如果在watermark的基礎上有的資料還是可能會遲到,這時我們可以再多給資料一定的可以遲到的時間,此時當watermark到達視窗大小時觸發計算,但是不會關閉視窗,而是直到所允許的遲到時間後,才會真正關閉視窗。
3、側輸出流
當資料遲到的時間非常久,前兩種都失效時使用,相當于遲到資料歸放入一個分支流中進行單獨計算。
4.1.1 Flink SQL處理亂序
◇建立kafka動态表
在建立Kafka動态表時,我們使用watermark for orderTime as orderTime – interval ‘2’MINUTE語句來說明提取的watermark為訂單時間減2分鐘。
◇開窗聚合計算
如果想每10秒輸出一次計算結果,可以設定下面參數:
在SQL層面,目前隻支援使用watermark來處理資料亂序問題。
4.2.1 DataStream API處理亂序
從上面代碼可以看出,我們使用時間配置設定器時,使用WatermarkStrateg。
forBoundedOutOfOrderness(Duration.ofMillis(jobPrarameter.getOutOfBoundMs()))方法來設定資料的最大亂序時間。
在開視窗後,我們又使用DataStream的allowedLatenes(Time.seconds(30))來設定視窗延遲關閉時間為30秒。
如果資料在watermark大于等于視窗的結束時間+最大延遲時間之後才到達,這時,如果我們不做任何處理的話的資料就會被丢棄掉,如果當我們不想丢棄掉這些資料,我們還可以通過側輸出流來解決。解決步驟如下:
1、建立側輸出的Tag标志對象.
2、在allowedLateness之後,設定将資料輸出到outputTag标志的側輸出流。
3、從視窗計算後得到的流中通過getSideOutput(outputTag)擷取遲到資料的側輸出流。
4、得到遲到資料的側輸出流後,我們就可以根據業務需要對側輸出流的資料進行相應的處理。在下面的代碼中我們将視窗聚合計算得到的流與側輸出流進行connect連接配接後,得到一個ConnectedStreams對象,在ConnectedStreams對象上使用flatMap方法的CoFlatMapFunction将側輸出流的資料累加到聚合計算的結果流上時進行輸出,進而保證了聚合計算後的資料的準确性。
是以,與SQL相比,使用DataStream API我們可以更加靈活的處理資料,是以在處理資料亂序時,根據需要,我們可以同時使用三種處理亂序的方案來處理亂序資料。而SQL目前隻支援使用Watermark的方式來處理亂序資料。
4.2 Flink大狀态問題解決方案
在上面的Flink大狀态場景及問題分析章節,我們可以知道大狀态會導緻如下問題:
1、記憶體超額使用
2、Checkpoint時間過長
以上問題,我們可以通過以下辦法來解決:
4.2.1 減少狀态大小
如果狀态比較大,我們是否可以減少狀态呢,答案是肯定的。例如當我們狀态使用的POJO類對象時,我們可以使用ProtoStuff來減少對象的存儲空間,進而減少對象序列化後的存儲空間大小。下面是使用ProtoStuff進行序列化和反序列化的代碼示例。
注:在目前使用的ProtoStuff的版本,由于不支援時間類型,是以對時間類型可以轉換成字元串或Long類型來存儲。
4.2.2 記憶體超額使用
如果TM是因為記憶體不足被kill掉,解決方案有二種:
1、想辦法減少狀态大小(如上面的第一點)。
2、增加TM的記憶體,如原來TM為4G,可以增加到6G,甚至更多。
4.2.3 Checkpoint時間長
◇修改checkpoint政策
預設情況下,flink checkpoint 是全量備份,當狀态很大,全量備份會導緻每次的checkpoint耗時會很長,開啟checkpoint的增量配置,在可以顯著減少每次checkpoint狀态的大小,以及checkpoint的時間。修改flink-conf.yaml中的配置,将state.backend.incremental設定為true。
◇算子背壓導緻checkpoint時間過長
如果是某個算子的背壓原因導緻checkpoint時間很長,我們可以去優化該算子,找出最耗時的操作進行優化。下面是我遇到一些場景的優化方案:
1、與次元表join。這種情況下,一是加緩存并設定資料過期時間。二是使用Redis來存放次元表的資料。
2、資料傾斜。這一塊我們将在資料傾斜一節專門來講,這裡就不再說了。
算子的計算邏輯複雜。将計算過程拆分到多個算子中執行,而不是在一個算子中完成所有的邏輯計算。
◇磁盤IO導緻checkpoint時間過長
如果是因為磁盤IO問題,我們可以通過以下方案來解決:
1、檢查磁盤是否為SSD磁盤, 如果不是,則建議使用SSD磁盤。
2、挂載多塊磁盤,在flink-conf.yaml中通過配置state.backend.rocksdb.localdir來指定多個挂載目錄。不同的目錄挂載到不同的磁盤。
3、通過上面的操作後,還是存在IO問題,比如有的磁盤IO使用高,有的使用低。在這種情況下,可能是由于多個sub task共用同一塊磁盤的問題,導緻負載不均衡。解決這一個問題,通常的政策就是負載均衡政策。通用的負載均衡政策有hash, 随機以及輪循等政策。Flink預設使用的政策是随機政策,源碼如下:
如果想使用其他政策,可以修改這裡的源碼。
◇資料傳輸導緻checkpoint時間過長
Async Duration是指RocksDB将本地checkpoint的狀态備份到其他持久化存儲系統所花費的時間,其他存儲系統如HDFS檔案系統。如果是由于Async Duration時間長,先檢查網絡IO使用是否很高,如果不是很高,說明RocksDB将本地狀态資料上傳到HDFS時花費的時間太多,這時我們可以提高RocksDB的資料傳輸線程數量,因為RocksDB預設的資料傳輸線程數量為1,是以我們可以增加線程數量來減少Async Duration的時間。增加資料傳輸線程數量的代碼設定如下:
4.3 資料傾斜問題解決方案
4.3.1 Local-Global Aggregation政策
Flink對于資料傾斜的解決方案是采用Local-Global Aggregation政策。Local-Global将一個組聚合分為兩個階段,即首先在上遊進行局部聚合,然後在下遊進行全局聚合。它類似于MapReduce的Combine + Reduce模式。它的原理如下圖4-1所示。
圖4-1 Local-Global Aggregation優化示意圖
Flink sql使用Local-Global Aggregation政策非常簡單,隻要設定如下配置即可:
4.3.2 Split-Distinct Aggregation
Local-Global 優化可有效消除一般聚合的資料傾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在處理distinct 的聚合時,它的性能并不令人滿意。例如下面的SQL:
如果不同鍵(即user_id)的值稀疏,則 COUNT DISTINCT不擅長減少記錄。即使啟用了Local-Global優化,也無濟于事。因為累加器仍然包含幾乎所有的原始記錄,全局聚合将成為瓶頸。如下圖4-2左側所示。
圖4-2 Split-Distinct Aggregation優化示意圖
Split-Distinct Aggregation優化的想法是将不同的聚合(例如COUNT(DISTINCT col))分成兩個級别。
第一步是按照day和bucket鍵值進行分組聚合。bucket鍵值等于HASH_CODE(user_id) % BUCKET_NUM。BUCKET_NUM預設1024,可通過table.optimizer.distinct-agg.split.bucket-num選項配置。
第二步按原始鍵(即day)分組,SUM用于聚合來自不同存儲桶的 COUNT DISTINCT 值。因為相同的distinct key隻會在同一個bucket中計算,是以變換是等價的。
上面的sql通過split-distinct aggregation優化後,SQL相當于下面的SQL:
Flink SQL要開啟split-distinct aggregation優化,隻需要設定下面的配置即可:
5.總結
本文深入分析了Flink實時計算應用實踐中的常見問題:資料亂序,大狀态作業優化,資料傾斜,并且對相應問題提出了可行的解決方案。後續我們将探讨Flink在智能風控,實時資料入湖,實時監控等其它業務場景的應用實踐。