Flink入坑指南系列文章,從實際例子入手,一步步引導使用者零基礎入門實時計算/Flink,并成長為使用Flink的高階使用者。
簡介
Group By + Agg這個最經典的SQL使用方式。Group By是SQL中最基礎的分組操作,agg的全稱是aggregation(聚合操作),是一類SQL算子的統稱,Flink中最常用的Agg操作有COUNT/SUM/AVG等,詳情參見
Flink支援的聚合操作清單。在實際使用中,Group By+Agg絕大部分場景下都會一起出現。作為最常用的SQL模式,學習好這種模式的最優寫法,也就非常重要了。本章從兩個需求開始,進一步了解一下Group By + Agg模式的最優寫法,及實時計算産品/Alibaba Flink版本中的部分優化.
需求
上一章中,小明已經把第一個需求完成了,同時也了解了
持續查詢,
state等流計算中的基礎概念。熟悉了Flink/
實時計算的基礎用法之後,小明開始着手開發其他的需求:
- 從0點開始,每個類目的成交額
- 從0點開始,每個店鋪的uv/pv
- 從0點開始,每個使用者點選了多少商品,多少店鋪
Group By + Agg
先看需求1:從0點開始,每個類目的成交額。進入Flink的原始資料結構如下:
ctime | category_id | shop_id | item_id | price |
---|---|---|---|---|
2018-12-04 15:44:54 | cat_01 | shop_01 | item_01 | 10 |
2018-12-04 15:45:46 | cat_02 | shop_02 | item_02 | 11.1 |
2018-12-04 15:46:11 | shop_03 | item_03 | 12.4 |
FlinkSQL代碼如下,看上去與傳統資料庫/批處理的SQL相同:
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
category_id,
sum(price) as category_gmv
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), category_id; --按照天做聚合
以這個例子入手,着重說明Group By+Agg通用模式的兩個問題:
- 計算特點
- 常見問題及解法
Group by+Agg模式在底層的有一些特點:
- Group by分組操作,會産生資料shuffle
- 按Key的agg操作,最終都需要落到同一個實體程序上才能保證計算的正确性
以這個最簡單SQL為例,其資料流程圖如下,不同顔色代表不同的category_id:
資料源進來的資料先經過group by進行分組,同一個key的資料被分到同一個worker上之後再進行聚合操作。特點2就決定了,Group By + Agg 模式中,SQL作業性能與資料分布非常相關,如果資料中存在__資料傾斜__,也就是某個key的資料異常的多,那麼某個聚合節點就會成為瓶頸,作業就會有明顯的反壓及延時現象。
為了解決這個問題,就需要将堵住的聚合節點進行拆分,優化後的SQL如下:
SELECT cdate,category_id,sum(category_gmv_p) as category_gmv
FROM(
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
category_id,
sum(price) as category_gmv_p
FROM src
GROUP BY category_id, mod(hash_code(FLOOR(RAND(1)*1000), 256),date_format(ctime, '%Y%m%d'); --按照天做聚合
)
GROUP BY cdate,category_id
SQL中做了将一個Group By+Agg拆稱了兩個,子查詢裡按照category_id和mod(hash_code(FLOOR(RAND(1)*1000), 256)分組,将同一個category_id上的資料打散成了256份,先做一層聚合。外層Group By+Agg,将子查詢聚合後的結果再次做聚合。這樣通過兩層聚合的方式,即可大大緩解某聚合節點擁堵的現象。其資料流程圖如下:
如果使用者用的是開源Flink1.7版本,如果作業出現資料傾斜情況,就需要按以上方法對SQL進行改造,以提高作業吞吐,降低由于資料傾斜造成的業務延時。
相關函數用法,
Floor,
Rand Hash_Code在
實時計算産品__使用 Flink版本,針對這種情況做了特殊優化,使用Local-Global Agg的方式完美解決了Group By+Agg模式中的資料傾斜問題,使用者使用第一種(最簡單)的SQL即可。__關于Local-Global Agg原理方面的介紹,後續會有專門文章,敬請期待。
GroupBy+單Distinct Agg
第二個需求:計算從0點開始,每個店鋪的uv/pv
原始資料:
uid | action | ||||
---|---|---|---|---|---|
10001 | |||||
10002 |
其中action有三種:
- 0: 浏覽
- 1: 點選
- 2: 加購
- 3: 購買
經過這段時間的學習,小明三兩下就寫出SQL:
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
shop_id,
count(distinct uid) as shop_uv, -- shop uv
count(uid) as shop_pv -- show pv
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), shop_id; --按照天做聚合
同樣,按照上節所述,如果這個作業出現了資料傾斜的現象,就需要将SQL優化為:
select
cdate,
shop_id,
sum(shop_uv_partial) as shop_uv,
sum(shop_pv_partial) as shop_pv
from (
select
date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
shop_id,
count(distinct uid) as shop_uv_partial,
count(uid) as shop_pv_partial
from src
group by shop_id, mod(hash_code(uid), 256),date_format(ctime, '%Y%m%d')
)
group by cdate,shop_id
本例子中,将原始SQL中的一層查詢,拆成了兩層查詢。内層子查詢,按照shop_id和mod(hash_code(uid),256)做聚合,将同一個shop_id的資料打散到多個節點中。外層查詢,将子查詢聚合後的結果,再按shop_id聚合。通過兩層聚合即可大大緩解資料傾斜情況下聚合節點的壓力。
Group By+Agg場景與Group By+Distinct Agg場景的主要差別,在于state中存儲的資料。上一章中提到過,Flink是增量計算,state中會儲存增量資料,比如上次SUM的值等等,但是在DISTINCT計算過程中,就需要保留所有的distinct的key,在本例子中,就是uid。且在每一次計算過程中,都要查詢目前state中是否有同一個uid,并計數。是以在大資料量情況下distinct節點往往成為Flink作業的瓶頸。需要通過擴并發等方式解決。
同樣,在
使用 Flink版本,針對這種情況做了特殊優化,使用Partial-Final Agg的方式完美解決了Group By+Distinct Agg模式中的資料傾斜問題,使用者使用第一種(最簡單)的SQL即可。關于Partial-Final Agg原理方面的介紹,後續會有專門文章,敬請期待。
Group By+多Distinct Agg
第三個需求:從0點開始,每個使用者點選了多少商品,多少店鋪,以及該使用者總點選item次數。原始資料如下:
| |||||
經過一番思索,小明寫出了如下SQL:
SELECT UDTF
date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
uid,
count(distinct shop_id) as shop_cnt,
count(distinct item_id) as item_cnt,
count(item_id) as click_cnt
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), uid;
需求2相比,SQL中distinct個數變成了多個,這種情況下要優化SQL就更複雜了。有一種比較原始的做法:
- 先使用UDTF,将原始資料一行拆成多行,每行添加n+1列,n為distinct的個數。n列分别對distinct的值做hash。具體例子如下:
hash_shop | hash_item | flag | ||||||
hash(shop_01) | null | flag0 | ||||||
hash(item_01) | flag1 | |||||||
flag2 | ||||||||
hash(shop_02) | ||||||||
hash(item_02) | ||||||||
hash(shop_03) | ||||||||
hash(item_03) | ||||||||
- 在SQL中,先在子查詢中分别計算各名額的count值,在外層再做一層sum即可,SQL示例如下:
select
cdate,
uid,
sum(shop_cnt_p) as shop_cnt,
sum(item_id_p) as item_id_cnt,
sum(item_cnt_p) as item_cnt
from (
select
date_format(ctime, '%Y%m%d') as cdate,
uid,
count(distinct shop_id) filter (where flag = flag0) as shop_cnt_p,
count(distinct item_id) filter (where flag = flag1) as item_id_p ,
sum(item_id) filter (where flag = flag2) as item_cnt_p
from Expand_T
group by uid, hash_user, hash_shop, date_format(ctime, '%Y%m%d')
)
group by uid
這種問題可以解決多個distinct中的資料傾斜問題,但是會增加sql複雜度,并且計算過程中數量會膨脹,并且占用更多資源。
使用 Flink版本,針對這種情況做了特殊優化,使用Partial-Final Agg+Incremental Agg的方式完美解決了Group By+多個Distinct Agg模式中的資料傾斜問題,使用者不需要在SQL上做拆分。關于Partial-Final Agg+Incremental Agg原理方面的介紹,後續會有專門文章,敬請期待。
資料傾斜相關配置
在使用實時計算産品時,如果遇到資料傾斜問題,可以增加以下配置,即可解決,不需要手動進行SQL優化。
# 開啟5秒的microbatch
blink.microBatch.allowLatencyMs=5000
blink.miniBatch.allowLatencyMs=5000
blink.miniBatch.size=20000
# Local 優化,預設已經開啟
# blink.localAgg.enabled=true
# 開啟 Partial 優化,解決count distinct熱點
blink.partialAgg.enabled=true
# Incremental 優化,預設已經開啟
# blink.incrementalAgg.enabled=true