本文整理自直播《實時計算 Flink 版 SQL 實踐-李麟(海豹)》
視訊連結:
https://developer.aliyun.com/learning/course/807/detail/13887内容簡要:
一、實時計算Flink版SQL簡介
二、實時計算Flink版SQL上手示例
三、開發常見問題和解法
實時計算Flink版SQL簡介
(一)關于實時計算Flink版SQL

選擇了SQL這種聲明式語言作為頂層API,比較穩定,也友善使用者使用。Flink SQL具備流批統一的特性,給使用者統一的開發體驗,并且語義一緻。另外,Flink SQL能夠自動優化,包括屏蔽流計算裡面State的複雜性,也提供了自動優化的Plan,并且還內建了AutoPilot自動調優的功能。Flink SQL的應用場景也比較廣泛,包括資料內建、實時報表、實時風控,還有線上機器學習等場景。
(二)基本操作
在基本操作上,可以看到SQL的文法和标準SQL非常類似。示例中包括了基本的SELECT、FILTER操作。,可以使用内置函數,如日期的格式化,也可以使用自定義函數,比如示例中的匯率轉換就是一個使用者自定義函數,在平台上注冊後就可以直接使用。
(三)維表 Lookup Join
在實際的資料處理過程中,維表的Lookup Join也是一個比較常見的例子。
這裡展示的是一個維表INNER JOIN示例。
例子中顯示的SOURCE表是一個實時變化的訂單資訊表,它通過INNER JOIN去關聯維表資訊,這裡标黃高亮的就是維表JOIN的文法,可以看到它和傳統的批處理有一個寫法上的差異,多了FOR SYSTEM_TIME AS OF這個子句來标明它是一個維表JOIN的操作。SOURCE表每來一條訂單消息,它都會觸發維表算子,去做一次對維表資訊的查詢,是以把它叫做一個Lookup Join。
(四)Window Aggregation
Window Aggregation(視窗聚合)操作也是常見的操作,Flink SQL中内置支援了幾種常用的Window類型,比如Tumble Window,Session Window,Hop Window,還有新引入的Cumulate Window。
Tumble
Tumble Window可以了解成固定大小的時間視窗,也叫滾窗,比如說5分鐘、10分鐘或者1個小時的固定間隔的視窗,視窗之間沒有重疊。
Session
Session Window(會話視窗) 定義了一個連續事件的範圍,視窗定義中的一個參數叫做Session Gap,表示兩條資料的間隔如果超過定義的時長,那麼前一個Window就結束了,同時生成了一個新的視窗。
Hop
Hop Window不同于滾動視窗的視窗不重疊,滑動視窗的視窗之間可以重疊。滑動視窗有兩個參數:size 和 slide。size 為視窗的大小,slide 為每次滑動的步長。如果slide < size,則視窗會重疊,同一條資料可能會被配置設定到多個視窗;如果 slide = size,則等同于 Tumble Window。如果 slide > size,視窗之間沒有重疊且有間隙。
Cumulate
Cumulate Window(累積視窗),是Flink社群1.13版本裡新引入的,可以對比 Hop Window來了解,差別是從Window Start開始不斷去累積。示例中Window 1、Window 2、Window 3是在不斷地增長的。它有一個最大的視窗長度,比如我們定義Window Size是一天,然後Step步長是1個小時,那麼它會在一天中的每個小時産生累積到目前小時的聚合結果。
看一個具體的Window聚合處理示例。
如上圖所示,比如說需要進行每5分鐘單個使用者的點選數統計。
源資料是使用者的點選日志,我們期望算出每5分鐘單個使用者的點選總數, SQL 中使用的是社群最新的 WindowTVF文法,先對源表開窗,再 GROUP BY 視窗對應的屬性 window_start和window_end, COUNT(*)就是點選數統計。
可以看到,當處理12:00到12:04的資料,有2個使用者産生了4次點選,分别能統計出來使用者Mary是3次,Bob是1次。在接下來一批資料裡面,又來了3條資料,對應地更新到下一個視窗中,分别是1次和2次。
(五)Group Aggregation
相對于Window Aggregation來說,Group Aggregation直接觸發計算,并不需要等到視窗結束,适用的一個場景是計算累積值。
上圖的例子是單個使用者累積到目前的點選數統計。從Query上看,寫法相對簡單一點,直接 GROUP BY user 去計算COUNT(*),就是累積計數。
可以看到,在結果上和Window的輸出是有差異的,在與Window相同的前4條輸入資料,Group Aggregation輸出的結果是Mary的點選數已更新到3次,具體的計算過程可能是從1變成2再變成3,Bob是1次,随着後面3條資料的輸入,Bob對應的點選數又會更新成2次,對結果是持續更新的過程,這和Window的計算場景是有一些差別的。
之前Window視窗裡面輸出的資料,在視窗結束後結果就不會再改變,而在Group Aggregation裡,同一個Group Key的結果是會産生持續更新的。
(六)Window Aggregation Vs Group Aggregation
更全面地對比一下Window和Group Aggregation的一些差別。
Window Aggregation在輸出模式上是按時輸出,是在定義的資料到期之後它才會輸出。比如定義5分鐘的視窗,結果是延遲輸出的,比如00:00~00:05這個時間段,它會等整個視窗資料都到齊之後,才完整輸出出來,并且結果隻輸出一次,不會再改變。
Group Aggregation是資料觸發,比如第一條資料來它就會輸出結果,同一個Key 的第二條資料來結果會更新,是以在輸出流的性質上兩者也是不一樣的。Window Aggregation一般情況下輸出的是Append Stream,而在Group Aggregation輸出的是Update Stream。
在狀态State處理上兩者的差異也比較大。Window Aggregation會自動清理過期資料,使用者就不需要額外再去關注 State的膨脹情況。Group Aggregation是基于無限的狀态去做累積,是以需要使用者根據自己的計算場景來定義State的TTL,就是State儲存多久。
比如統計一天内累計的PV和UV,不考慮資料延遲的情況,也至少要保證State的TTL要大于等于一天,這樣才能保證計算的精确性。如果State的TTL定義成半天,統計值就可能不準确了。
對輸出的存儲要求也是由輸出流的性質來決定的。在Window的輸出上,因為它是Append流,所有的類型都是可以對接輸出的。而Group Aggregatio輸出了更新流,是以要求目标存儲支援更新,可以用Hologres、MySQL或者HBase這些支援更新的存儲。
實時計算 Flink 版SQL上手示例
下面通過具體的例子來看每一種SQL操作在真實的業務場景中會怎麼使用,比如SQL基本的文法操作,包括一些常見的Aggregation的使用。
(一)示例場景說明:電商交易資料 - 實時數倉場景
這裡的例子是電商交易資料場景,模拟了實時數倉裡分層資料處理的情況。
在資料接入層,我們模拟了電商的交易訂單資料,它包括了訂單ID,商品ID,使用者ID,交易金額,商品的葉子類目,交易時間等基本資訊,這是一個簡化的表。
示例1會從接入層到資料明細層,完成一個資料清洗工作,此外還會做類目資訊的關聯,然後資料的彙總層我們會示範怎麼完成分鐘級的成交統計、小時級口徑怎麼做實時成交統計,最後會介紹下在天級累積的成交場景上,怎麼去做準實時統計。
- 示例環境:内測版
示範環境是目前内測版的實時計算Flink産品,在這個平台可以直接做一站式的作業開發,包括調試,還有線上的運維工作。
- 接入層資料
使用 SQL DataGen Connector 生成模拟電商交易資料。
接入層資料:為了友善示範,簡化了鍊路,用内置的SQL DataGen Connector來模拟電商資料的産生。
這裡面order_id是設計了一個自增序列,Connector的參數沒有完整貼出來。 DataGen Connector支援幾種生成模式,比如可以用Sequence産生自增序列,Random模式可以模拟随機值,這裡根據不同的字段業務含義,選擇了不同的生成政策。
比如order_id是自增的,商品ID是随機選取了1~10萬,使用者ID是1~1000萬,交易金額用分做機關, cate_id是葉子類目ID,這裡共模拟100個葉子類目,直接通過計算列對商品ID取餘來生成,訂單建立時間使用目前時間模拟,這樣就可以在開發平台上調試,而不需要去建立Kafka或者DataHub做接入層的模拟。
(二)示例1-1 資料清洗
- 電商交易資料-訂單過濾
這是一個資料清洗的場景,比如需要完成業務上的訂單過濾,業務方可能會對交易金額有最大最小的異常過濾,比如要大于1元,小于1萬才保留為有效資料。
交易的建立時間是選取某個時刻之後的,通過WHERE條件組合過濾,就可以完成這個邏輯。
真實的業務場景可能會複雜很多,下面來看下SQL如何運作。
這是使用調試模式,在平台上點選運作按鈕進行本地調試,可以看到金額這一列被過濾,訂單建立時間也都是大于要求的時間值。
從這個簡單的清洗場景可以看到,實時和傳統的批處理相比,在寫法上包括輸出結果差異并不大,流作業主要的差異是運作起來之後是長周期保持運作的,而不像傳統批處理,處理完資料之後就結束了。
(三)示例1-2 類目資訊關聯
接下來看一下怎麼做維表關聯。
根據剛才接入層的訂單資料,因為原始資料裡面是葉子類目資訊,在業務上需要關聯類目的次元表,次元表裡面記錄了葉子類目到一級類目的關聯關系,ID和名稱,清洗過程需要完成的目标是用原始表裡面葉子類目ID去關聯維表,補齊一級類目的ID和Name。這裡通過INNER JOIN維表的寫法,關聯之後把維表對應的字段選出來。
和批處理的寫法差異僅僅在于維表的特殊文法FOR SYSTEM_TIME AS OF。
如上所示,平台上可以上傳自己的資料用于調試,比如這裡使用了1個CSV的測試資料,把100個葉子類目映射到10個一級類目上。
對應葉子類目ID的個位數就是它一級類目的ID,會關聯到對應的一級類目資訊,傳回它的名稱。本地調試運作優點是速度比較快,可以即時看到結果。在本地調試模式中,終端收到1000條資料之後,會自動暫停,防止結果過大而影響使用。
(四)示例2-1 分鐘級成交統計
接下來我們來看一下基于Window的統計。
第一個場景是分鐘級成交統計,這是在彙總層比較常用的計算邏輯。
分鐘級統計很容易想到Tumble Window,每一分鐘都是各算各的,需要計算幾個名額,包括總訂單數、總金額、成交商品數、成交使用者數等。成交的商品數和使用者數要做去重,是以在寫法上做了一個Distinct處理。
視窗是剛剛介紹過的Tumble Window,按照訂單建立時間去劃一分鐘的視窗,然後按一級類目的次元統計每一分鐘的成交情況。
- 運作模式
上圖和剛才的調試模式有點差別,上線之後就真正送出到叢集裡去運作一個作業,它的輸出采用了調試輸出,直接Print到Log裡。展開作業拓撲,可以看到自動開啟了Local-Global的兩階段優化。
- 運作日志 - 檢視調試輸出結果
在運作一段時間之後,通過Task裡面的日志可以看到最終的輸出結果。
用的是Print Sink,會直接打到Log裡面。在真實場景的輸出上,比如寫到Hologres/MySQL,那就需要去對應存儲的資料庫上檢視。
可以看到,輸出的資料相對于資料的原始時間是存在一定滞後的。
在19:46:05的時候,輸出了19:45:00這一個視窗的資料,延遲了5秒鐘左右輸出前1分鐘的聚合結果。
這5秒鐘實際上和定義源表時WATERMARK的設定是有關系的,在聲明WATERMARK時是相對gmt_create字段加了5秒的offset。這樣起到的效果是,當到達的最早資料是 19:46:00 時,我們認為水位線是到了19:45:55,這就是5秒的延遲效果,來實作對亂序資料的寬容處理。
(五)示例2-2 小時級實時成交統計
第二個例子是做小時級實時成交統計。
如上圖所示,當要求實時統計,直接把Tumble Window開成1小時Size的Tumble Window,這樣能滿足實時性嗎?按照剛才展示的輸出結果,具有一定的延遲效果。是以開一個小時的視窗,必須等到這一個小時的資料都收到之後,在下一個小時的開始,才能輸出上一個小時的結果,延遲在小時級别的,滿足不了實時性的要求。回顧之前介紹的 Group Aggregation 是可以滿足實時要求的。
具體來看,比如需要完成小時+類目以及隻算小時的兩個口徑統計,兩個統計一起做,在傳統批進行中常用的GROUPING SETS功能,在實時Flink上也是支援的。
我們可以直接GROUP BY GROUPING SETS,第一個是小時全口徑,第二個是類目+小時的統計口徑,然後計算它的訂單數,包括總金額,去重的商品數和使用者數。
這種寫法對結果加了空值轉換處理便于檢視資料,就是對小時全口徑的統計,輸出的一級類目是空的,需要對它做一個空值轉換處理。
上方為調試模式的運作過程,可以看到Datagen生成的資料實時更新到一級類目和它對應的小時上。
這裡可以看到,兩個不同GROUP BY的結果在一起輸出,中間有一列ALL是通過空值轉換來的,這就是全口徑的統計值。本地調試相對來說比較直覺和友善,有興趣的話也可以到阿裡雲官網申請或購買進行體驗。
(六)示例2-3 天級累積成交準實時統計
第三個示例是天級累計成交統計,業務要求是準實時,比如說能夠接受分鐘級的更新延遲。
按照剛才Group Aggregation小時的實時統計,容易聯想到直接把Query改成天次元,就可以實作這個需求,而且實時性比較高,資料觸發之後可以達到秒級的更新。
回顧下之前提到的Window和Group Aggregation對于内置狀态處理上的差別,Window Aggregation可以實作State的自動清理,Group Aggregation需要使用者自己去調整 TTL。由于業務上是準實時的要求,在這裡可以有一個替代的方案,比如用新引入的Cumulate Window做累積的Window計算,天級的累積然後使用分鐘級的步長,可以實作每分鐘更新的準實時要求。
回顧一下Cumulate Window,如上所示。天級累積的話,Window的最大Size是到天,它的Window Step就是一分鐘,這樣就可以表達天級的累積統計。
具體的Query如上,這裡使用新的TVF文法,通過一個TABLE關鍵字把Windows的定義包含在中間,然後 Cumulate Window引用輸入表,接着定義它的時間屬性,步長和size 參數。GROUP BY就是普通寫法,因為它有提前輸出,是以我們把視窗的開始時間和結束時間一起列印出來。
這個例子也通過線上運作的方式去看Log輸出。
可以看到,它和之前Tumble Window運作的結構類似,也是預聚合加上全局聚合,它和Tumble Window的差別就是并不需要等到這一天資料都到齊了才輸出結果。
- 運作日志 – 觀察調試結果
從上方示例可以看到,在20:47:00的時候,已經有00:00:00到20:47:00的結果累積,還有對應的4列統計值。下一個輸出就是接下來的累計視窗,可以看到20:47:00到20:48:00就是一個累計的步長,這樣既滿足了天級别的累計統計需求,也能夠滿足準實時的要求。
(七)示例小結:電商交易資料-實時數倉場景
然後我們來整體總結一下以上的示例。
在接入層到明細層的清洗處理特點是相對簡單,也比較明确,比如業務邏輯上需要做固定的過濾條件,包括次元的擴充,這都是非常明确和直接的。
從明細層到彙總層,例子中的分鐘級統計,我們是用了Tumble Window,而小時級因為實時性的要求,換成了Group Aggregation,然後到天級累積分别展示Group Aggregation和新引入的Cumulate Window。
從彙總層的計算特點來說,我們需要去關注業務上的實時性要求和資料準确性要求,然後根據實際情況選擇Window聚合或者Group 聚合。
這裡為什麼要提到資料準确性?
在一開始比較Window Aggregation和Group Aggregation的時候,提到Group Aggregation的實時性非常好,但是它的資料準确性是依賴于State的TTL,當統計的周期大于TTL,那麼TTL的資料可能會失真。
相反,在Window Aggregation上,對亂序的容忍度有一個上限,比如最多接受等一分鐘,但在實際的業務資料中,可能99%的資料能滿足這樣的要求,還有1%的資料可能需要一個小時後才來。基于WATERMARK的處理,預設它就是一個丢棄政策,超過了最大的offset的這些資料就會被丢棄,不納入統計,此時資料也會失去它的準确性,是以這是一個相對的名額,需要根據具體的業務場景做選擇。
開發常見問題和解法
(一)開發中的常見問題
上方是實時計算真實業務接觸過程中比較高頻的問題。
首先是實時計算不知道該如何下手,怎麼開始做實時計算,比如有些同學有批處理的背景,然後剛開始接觸Flink SQL,不知道從哪開始。
另外一類問題是SQL寫完了,也清楚輸入處理的資料量大概是什麼級别,但是不知道實時作業運作起來之後需要設定多大的資源
還有一類是SQL寫得比較複雜,這個時候要去做調試,比如要查為什麼計算出的資料不符合預期等類似問題,許多同學反映無從下手。
作業跑起來之後如何調優,這也是一個非常高頻的問題。
(二)開發常見問題解法
1.實時計算如何下手?
對于上手的問題,社群有很多官方的文檔,也提供了一些示例,大家可以從簡單的例子上手,慢慢了解SQL裡面不同的算子,在流式計算的時候會有一些什麼樣的特性。
此外,還可以關注開發者社群實時計算 Flink 版、 ververica.cn網站、 B 站的Apache Flink 公衆号等分享内容。
逐漸熟悉了SQL之後,如果想應用到生産環境中去解決真實的業務問題,阿裡雲的行業解決方案裡也提供了一些典型的架構設計,可以作為參考。
2.複雜作業如何調試?
如果遇到千行級别的複雜SQL,即使對于Flink的開發同學來也不能一目了然地把問題定位出來,其實還是需要遵循由簡到繁的過程,可能需要借助一些調試的工具,比如前面示範的平台調試功能,然後做分段的驗證,把小段SQL局部的結果正确性調試完之後,再一步一步組裝起來,最終讓這個複雜作業能達到正确性的要求。
另外,可以利用SQL文法上的特性,把SQL組織得更加清晰一點。實時計算Flink産品上有一個代碼結構功能,可以比較友善地定位長SQL裡具體的語句,這都是一些輔助工具。
3.作業初始資源設定,如何調優?
我們有一個經驗是根據輸入的資料,初始做小并發測試一下,看它的性能如何,然後再去估算。在大并發壓測的時候,按照需求的吞吐量,逐漸逼近,然後拿到預期的性能配置,這個是比較直接但也比較可靠的方式。
調優這一塊主要是借助于作業的運作是情況,我們會去關注一些重點名額,比如說有沒有産生資料的傾斜,維表的Lookup Join需要通路外部存儲,有沒有産生IO的瓶頸,這都是影響作業性能的常見瓶頸點,需要加以關注。
在實時計算Flink産品上內建了一個叫AutoPilot的功能,可以了解為類似于自動駕駛,在這種功能下,初始資源設多少就不是一個麻煩問題了。
在産品上,設定作業最大的資源限制後,根據實際的資料處理量,該用多少資源可以由引擎自動幫我們去調到最優狀态,根據負載情況來做伸縮。
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99元試用
(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc