天天看點

Flink入坑指南 第四章:SQL中的經典操作Group By+Agg簡介需求

Flink入坑指南系列文章,從實際例子入手,一步步引導使用者零基礎入門實時計算/Flink,并成長為使用Flink的高階使用者。

簡介

Group By + Agg這個最經典的SQL使用方式。Group By是SQL中最基礎的分組操作,agg的全稱是aggregation(聚合操作),是一類SQL算子的統稱,Flink中最常用的Agg操作有COUNT/SUM/AVG等,詳情參見

Flink支援的聚合操作清單

。在實際使用中,Group By+Agg絕大部分場景下都會一起出現。作為最常用的SQL模式,學習好這種模式的最優寫法,也就非常重要了。本章從兩個需求開始,進一步了解一下Group By + Agg模式的最優寫法,及實時計算産品/Alibaba Flink版本中的部分優化.

需求

上一章中,小明已經把第一個需求完成了,同時也了解了

持續查詢

state

等流計算中的基礎概念。熟悉了Flink/

實時計算

的基礎用法之後,小明開始着手開發其他的需求:

  1. 從0點開始,每個類目的成交額
  2. 從0點開始,每個店鋪的uv/pv
  3. 從0點開始,每個使用者點選了多少商品,多少店鋪

Group By + Agg

先看需求1:從0點開始,每個類目的成交額。進入Flink的原始資料結構如下:

ctime category_id shop_id item_id price
2018-12-04 15:44:54 cat_01 shop_01 item_01 10
2018-12-04 15:45:46 cat_02 shop_02 item_02 11.1
2018-12-04 15:46:11 shop_03 item_03 12.4

FlinkSQL代碼如下,看上去與傳統資料庫/批處理的SQL相同:

SELECT 
    date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
       category_id,
    sum(price) as category_gmv
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), category_id; --按照天做聚合
           

以這個例子入手,着重說明Group By+Agg通用模式的兩個問題:

  1. 計算特點
  2. 常見問題及解法

Group by+Agg模式在底層的有一些特點:

  1. Group by分組操作,會産生資料shuffle
  2. 按Key的agg操作,最終都需要落到同一個實體程序上才能保證計算的正确性

以這個最簡單SQL為例,其資料流程圖如下,不同顔色代表不同的category_id:

Flink入坑指南 第四章:SQL中的經典操作Group By+Agg簡介需求

資料源進來的資料先經過group by進行分組,同一個key的資料被分到同一個worker上之後再進行聚合操作。特點2就決定了,Group By + Agg 模式中,SQL作業性能與資料分布非常相關,如果資料中存在__資料傾斜__,也就是某個key的資料異常的多,那麼某個聚合節點就會成為瓶頸,作業就會有明顯的反壓及延時現象。

為了解決這個問題,就需要将堵住的聚合節點進行拆分,優化後的SQL如下:

SELECT cdate,category_id,sum(category_gmv_p) as category_gmv
FROM(
    SELECT 
        date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
           category_id,
        sum(price) as category_gmv_p
    FROM src
    GROUP BY category_id, mod(hash_code(FLOOR(RAND(1)*1000), 256),date_format(ctime, '%Y%m%d'); --按照天做聚合
)
GROUP BY cdate,category_id           

SQL中做了将一個Group By+Agg拆稱了兩個,子查詢裡按照category_id和mod(hash_code(FLOOR(RAND(1)*1000), 256)分組,将同一個category_id上的資料打散成了256份,先做一層聚合。外層Group By+Agg,将子查詢聚合後的結果再次做聚合。這樣通過兩層聚合的方式,即可大大緩解某聚合節點擁堵的現象。其資料流程圖如下:

Flink入坑指南 第四章:SQL中的經典操作Group By+Agg簡介需求

如果使用者用的是開源Flink1.7版本,如果作業出現資料傾斜情況,就需要按以上方法對SQL進行改造,以提高作業吞吐,降低由于資料傾斜造成的業務延時。

相關函數用法,

Floor

,

Rand Hash_Code

實時計算産品

__使用 Flink版本,針對這種情況做了特殊優化,使用Local-Global Agg的方式完美解決了Group By+Agg模式中的資料傾斜問題,使用者使用第一種(最簡單)的SQL即可。__關于Local-Global Agg原理方面的介紹,後續會有專門文章,敬請期待。

GroupBy+單Distinct Agg

第二個需求:計算從0點開始,每個店鋪的uv/pv

原始資料:

uid action
10001
10002

其中action有三種:

  • 0: 浏覽
  • 1: 點選
  • 2: 加購
  • 3: 購買

經過這段時間的學習,小明三兩下就寫出SQL:

SELECT 
    date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
       shop_id,
    count(distinct uid) as shop_uv, -- shop uv
    count(uid) as shop_pv -- show pv
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), shop_id; --按照天做聚合           

同樣,按照上節所述,如果這個作業出現了資料傾斜的現象,就需要将SQL優化為:

select 
    cdate,
    shop_id, 
    sum(shop_uv_partial) as shop_uv,
    sum(shop_pv_partial) as shop_pv
from (
    select 
        date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
        shop_id, 
        count(distinct uid) as shop_uv_partial,
        count(uid) as shop_pv_partial
    from src
    group by shop_id, mod(hash_code(uid), 256),date_format(ctime, '%Y%m%d')
)
group by cdate,shop_id           

本例子中,将原始SQL中的一層查詢,拆成了兩層查詢。内層子查詢,按照shop_id和mod(hash_code(uid),256)做聚合,将同一個shop_id的資料打散到多個節點中。外層查詢,将子查詢聚合後的結果,再按shop_id聚合。通過兩層聚合即可大大緩解資料傾斜情況下聚合節點的壓力。

Group By+Agg場景與Group By+Distinct Agg場景的主要差別,在于state中存儲的資料。上一章中提到過,Flink是增量計算,state中會儲存增量資料,比如上次SUM的值等等,但是在DISTINCT計算過程中,就需要保留所有的distinct的key,在本例子中,就是uid。且在每一次計算過程中,都要查詢目前state中是否有同一個uid,并計數。是以在大資料量情況下distinct節點往往成為Flink作業的瓶頸。需要通過擴并發等方式解決。

同樣,在

使用 Flink版本,針對這種情況做了特殊優化,使用Partial-Final Agg的方式完美解決了Group By+Distinct Agg模式中的資料傾斜問題,使用者使用第一種(最簡單)的SQL即可。關于Partial-Final Agg原理方面的介紹,後續會有專門文章,敬請期待。

Group By+多Distinct Agg

第三個需求:從0點開始,每個使用者點選了多少商品,多少店鋪,以及該使用者總點選item次數。原始資料如下:

經過一番思索,小明寫出了如下SQL:

SELECT UDTF
    date_format(ctime, '%Y%m%d') as cdate, -- 将資料從時間戳格式(2018-12-04 15:44:54),轉換為date格式(20181204)
       uid,
    count(distinct shop_id) as shop_cnt,
    count(distinct item_id) as item_cnt,
    count(item_id) as click_cnt

FROM src
GROUP BY date_format(ctime, '%Y%m%d'), uid;           

需求2相比,SQL中distinct個數變成了多個,這種情況下要優化SQL就更複雜了。有一種比較原始的做法:

  1. 先使用UDTF,将原始資料一行拆成多行,每行添加n+1列,n為distinct的個數。n列分别對distinct的值做hash。具體例子如下:
hash_shop hash_item flag
hash(shop_01) null flag0
hash(item_01) flag1
flag2
hash(shop_02)
hash(item_02)
hash(shop_03)
hash(item_03)
  1. 在SQL中,先在子查詢中分别計算各名額的count值,在外層再做一層sum即可,SQL示例如下:
select 
    cdate,
    uid, 
    sum(shop_cnt_p) as shop_cnt, 
    sum(item_id_p) as item_id_cnt, 
    sum(item_cnt_p) as item_cnt
from (
    select
        date_format(ctime, '%Y%m%d') as cdate,
        uid,
        count(distinct shop_id) filter (where flag = flag0) as shop_cnt_p,
        count(distinct item_id) filter (where flag = flag1) as item_id_p ,
        sum(item_id) filter (where flag = flag2) as item_cnt_p
    from Expand_T
    group by uid, hash_user, hash_shop, date_format(ctime, '%Y%m%d')
    )
group by uid           

這種問題可以解決多個distinct中的資料傾斜問題,但是會增加sql複雜度,并且計算過程中數量會膨脹,并且占用更多資源。

使用 Flink版本,針對這種情況做了特殊優化,使用Partial-Final Agg+Incremental Agg的方式完美解決了Group By+多個Distinct Agg模式中的資料傾斜問題,使用者不需要在SQL上做拆分。關于Partial-Final Agg+Incremental Agg原理方面的介紹,後續會有專門文章,敬請期待。

資料傾斜相關配置

在使用實時計算産品時,如果遇到資料傾斜問題,可以增加以下配置,即可解決,不需要手動進行SQL優化。

# 開啟5秒的microbatch
blink.microBatch.allowLatencyMs=5000
blink.miniBatch.allowLatencyMs=5000
blink.miniBatch.size=20000
# Local 優化,預設已經開啟
# blink.localAgg.enabled=true
# 開啟 Partial 優化,解決count distinct熱點
blink.partialAgg.enabled=true
# Incremental 優化,預設已經開啟
# blink.incrementalAgg.enabled=true