天天看點

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

1.序篇-本文結構

前面的章節鋪墊了那麼多,終于在本節走入一條 query 了。

針對 datastream api 大家都比較熟悉了,還是那句話,在 datastream 中,你寫的代碼邏輯是什麼樣的,它最終的執行方式就是什麼樣的。

但是對于 flink sql 的執行過程,大家還是不熟悉的。

是以本文通過以下章節使用 ETL,group agg(sum,count等)簡單聚合類 query 帶大家走進一條 flink sql query 邏輯的世界。幫大家至少能夠熟悉在 flink sql 程式運作時知道 flink 程式在幹什麼。

  1. 背景篇-大家不了解 flink sql 什麼?
  2. 目标篇-本文能幫助大家了解 flink sql 什麼?
  3. 實戰篇-簡單的 query 案例和運作原理
  4. 總結與展望篇

先說說結論:

  1. 場景問題:flink sql 很适合簡單 ETL,以及基本全部場景下的聚合類名額。
  2. 文法問題:flink sql 文法其實是和其他 sql 文法基本一緻的。基本不會産生文法問題阻礙使用 flink sql。
  3. 運作問題:檢視 flink sql 任務時的一些技巧:
    • 去 flink webui 看看這個任務目前在做什麼。包括算子名稱都會給直接展示給我們目前哪個算子在幹啥事情,在處理啥邏輯。
    • 如果你想知道你的 flink 任務執行了什麼代碼,就去看看 sql 最後轉換成的 transformation 裡面具體要執行哪些操作。flink sql 生成的代碼也在裡面。
    • 如果你不确定線上任務執行原理,可以直接在本地嘗試運作。

2.背景篇-大家不了解 flink sql 什麼?

首先從大家用 flink sql 的一個初衷和狀态出發,想一下大家在開始上手 flink sql 時,是什麼樣的一個想法?

部落客大概整理了下,在初步上手 flink sql,一般從入手到踩坑整個過程中,一般都會有以下幾種問題或者想法:

  1. 場景問題:首先 flink sql 是用來提效的,那相比 datastream,哪些場景很适合 flink sql 去做?
  2. 文法問題:我寫 sql 時 flink sql 文法會不會和其他 sql 文法有不同?
  3. 運作問題:我寫了一條 sql,運作起來了,但是對我來說是黑盒的,我怎麼知道這個任務正在執行什麼操作?有沒有什麼好辦法幫我去了解 flink sql 的運作機制?
  4. 了解誤區:在了解 flink sql 的運算機制上有哪些誤區?
  5. 坑:flink sql 一般都有啥坑?提前了解幫我們避免踩坑。

就是上面這些想法,會讓很多想在公司内部引入 flink sql 的同學望而卻步。

3.目标篇-本文能幫助大家了解 flink sql 什麼?

來看看本文的目标:

  1. 場景問題:幫大家了解哪些場景是非常适合 flink sql 的
  2. 文法問題:幫大家簡單熟悉 flink sql 的文法
  3. 運作問題:使用一條簡單的 query sql 看看其運作起來的過程,其運作的機制
  4. 了解誤區:運算機制上的常見誤區
  5. 坑:看看 sql 一般會有啥坑

由于一篇文章不能覆寫所有概念,本文主要介紹一些最簡單的 ETL,聚合場景,主要集中于前三點。

後兩點在後續系列文章中會按照場景詳細展開。

4.實戰篇-簡單的 query 案例和運作原理

4.1.場景問題:有哪些場景适合 flink sql?

不裝了,我坦白了,flink sql 其實很适合幹的活就是 dwd 清洗,dws 聚合。

此處主要針對實時數倉的場景來說。flink sql 能幹 dwd 清洗,dws 聚合,基本上實時數倉的大多數場景都能給覆寫了。

flink sql 牛逼!!!

但是!!!

經過部落客使用 flink sql 經驗來看,并不是所有的 dwd,dws 聚合場景都适合 flink sql(截止發文階段來說)!!!

其實這些目前不适合 flink sql 的場景總結下來就是在處理上比 datastream 還是會有一定的損失。

先總結下使用場景:

1. dwd:簡單的清洗、複雜的清洗、次元的擴充、各種 udf 的使用

2. dws:各類聚合

然後分适合的場景和不适合的場景來說,因為隻這一篇不能覆寫所有的内容,是以本文此處先大緻給個結論,之後會結合具體的場景較長的描述。

  • 适合的場景:
    1. 簡單的 dwd 清洗場景
    2. 全場景的 dws 聚合場景
  • 目前不太适合的場景:
    1. 複雜的 dwd 清洗場景:舉例比如使用了很多 udf 清洗,尤其是使用很多的 json 類解析清洗
    2. 關聯次元場景:舉例比如 datastream 中經常會有攢一批資料批量通路外部接口的場景,flink sql 目前對于這種場景雖然有 localcache、異步通路能力,但是依然還是一條一條通路外部緩存,這樣相比批量通路還是會有性能差距。

4.2.文法\運作問題

其實總結來說,對于接觸過 sql 的同學來說,除了 flink sql 中視窗聚合類的寫法來說,其他的 sql 文法都是相同的,很容易了解。

本節會針對具體的案例進行詳細介紹。

4.2.1.ETL

最簡單的 ETL 類型任務。

SELECT select_list FROM table_expression [ WHERE boolean_expression ]
           

複制

1.場景:簡單的 dwd 清洗過濾場景

源碼公衆号背景回複不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧擷取。

資料源表:

CREATE TABLE source_table (
    order_number BIGINT,
    price        DECIMAL(32,2)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.order_number.min' = '10',
  'fields.order_number.max' = '11'
)
           

複制

資料彙表:

CREATE TABLE sink_table (
    order_number BIGINT,
    price        DECIMAL(32,2)
) WITH (
  'connector' = 'print'
)
           

複制

ETL 邏輯:

insert into sink_table
select * from source_table
where order_number = 10
           

複制

2.運作:可以看到,其實在 flink sql 任務中,其會把對應的處理邏輯給寫到算子名稱上面。

Notes - 觀察 flink sql 技巧 1:

這個其實就是我們觀察 flink sql 任務的第一個技巧。如果你想知道你的 flink 任務在幹啥,第一反應是去 flink webui 看看這個任務目前在做什麼。包括算子名稱都會給直接展示給我們目前哪個算子在幹啥事情,在處理啥邏輯

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

5

3.結果

+I[10, 337546916355686018150362513408.00]
+I[10, 734895198061906189720381030400.00]
+I[10, 496632591763800912960818249728.00]
+I[10, 495090465926828588045441171456.00]
+I[10, 167305033642317182838130081792.00]
+I[10, 409466913112794578407573684224.00]
+I[10, 894352160414515330502514180096.00]
+I[10, 680063350384451712068576346112.00]
+I[10, 50807402446574997641386524672.00]
+I[10, 646597093362022945955245981696.00]
+I[10, 233317961584082024331537809408.00]
...
           

複制

4.原理:

先看一下一個 flink sql 任務的入口執行邏輯。

首先看看建表語句的執行和 query 語句執行的邏輯有什麼不同。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

可以發現執行到

executeInternal

時會針對具體的

operation

來執行不同的操作。

執行建表操作就是具體的

CreateTableOperation

時,會将表的資訊儲存到

catalogManager

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

8

執行 query 操作就是具體的

ModifyOperation

時,會将對應的邏輯轉換成對應的

Transformation

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

9

Transformation

中就包含了執行的整體邏輯以及對應要執行的 sql 代碼内容。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

10

接下來我們詳細看下對應的 transform 中包含了什麼内容。

首先是最外層

LegacySinkTransformation

,即 sink 算子,如圖就是 print sink function。比較好了解。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

11

然後是中間層

OneInputTransformation

,即 sql 中過濾和轉換操作(

select * from source_table where order_number = 10

),如圖是代碼生成的具體過濾和轉換邏輯。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

12

生成的代碼就在

GeneratedOperator

中的

code

字段。我們将對應的

code

複制到一個新的檔案夾中。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

13

這個算子是直接繼承了

OneInputStreamOperator

進行直接執行邏輯,跳過了 datastream 那一層。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

14

我們來看看最重要的

processElement

邏輯,具體字段解釋和執行邏輯如圖所示。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

Notes - 觀察 flink sql 技巧 2:

這個其實就是我們觀察 flink sql 任務的第二個技巧。如果你想知道你的 flink 任務執行了什麼代碼,就去看看 sql 最後轉換成的 transformation 裡面具體要執行哪些操作。

4.2.2.去重場景

1.場景:最簡單的去重場景

源碼公衆号背景回複不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧擷取。

資料源:

CREATE TABLE source_table (
    string_field STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.string_field.length' = '3'
)
           

複制

資料彙:

CREATE TABLE sink_table (
    string_field STRING
) WITH (
  'connector' = 'print'
)
           

複制

資料處理:

insert into sink_table
select distinct string_field
from source_table
           

複制

2.運作:可以看到,其實在 flink sql 任務中,其會把對應的處理邏輯給寫到算子名稱上面。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

17

3.上面這個案例的結果:

+I[cd3]
+I[8fc]
+I[b0c]
+I[1d8]
+I[e28]
+I[c5f]
+I[e7d]
+I[dfa]
+I[1fe]
...
           

複制

4.原理:

此處我們隻關注和上面不同的邏輯。

第一個就是

PartitionTransform

中的

KeyGroupStreamPartitioner

,就是對應的分區邏輯。來看看生成代碼的邏輯。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

其中做 shuffle 邏輯時,是按照

string_field

作為 key 進行 shuffle。

第二個就是

OneInputTransformation

中的

KeyedProcessOperator

,就是對應的去重邏輯。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

可以看到生成的 function 中隻有這三段代碼是業務邏輯代碼,但是其中的 RowData 初始化大小都是 0。那麼到底是哪裡做的去重邏輯呢?

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

24

我們跟一下處理邏輯會發現。去重邏輯主要集中在

GroupAggFunction#processElement

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

25

4.2.3.group 聚合場景

4.2.3.1.簡單聚合場景

1.場景:最簡單的聚合場景

源碼公衆号背景回複不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧擷取。

count,sum,avg,max,min 等:

資料源:

CREATE TABLE source_table (
    order_id STRING,
    price BIGINT
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.order_id.length' = '1',
  'fields.price.min' = '1',
  'fields.price.max' = '1000000'
)
           

複制

資料彙:

CREATE TABLE sink_table (
    order_id STRING,
    count_result BIGINT,
    sum_result BIGINT,
    avg_result DOUBLE,
    min_result BIGINT,
    max_result BIGINT
) WITH (
  'connector' = 'print'
)
           

複制

資料處理邏輯:

insert into sink_table
select order_id,
       count(*) as count_result,
       sum(price) as sum_result,
       avg(price) as avg_result,
       min(price) as min_result,
       max(price) as max_result
from source_table
group by order_id
           

複制

2.運作:

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

3.上面這個案例的結果:

+I[1, 1, 415300, 415300.0, 415300, 415300]
+I[d, 1, 416878, 416878.0, 416878, 416878]
+I[0, 1, 120837, 120837.0, 120837, 120837]
+I[c, 1, 337749, 337749.0, 337749, 337749]
+I[7, 1, 387053, 387053.0, 387053, 387053]
+I[8, 1, 387042, 387042.0, 387042, 387042]
+I[2, 1, 546317, 546317.0, 546317, 546317]
+I[e, 1, 22131, 22131.0, 22131, 22131]
+I[9, 1, 651731, 651731.0, 651731, 651731]
-U[0, 1, 120837, 120837.0, 120837, 120837]
+U[0, 2, 566664, 283332.0, 120837, 445827]
+I[b, 1, 748659, 748659.0, 748659, 748659]
-U[7, 1, 387053, 387053.0, 387053, 387053]
+U[7, 2, 1058056, 529028.0, 387053, 671003]
           

複制

4.原理:

來瞅一眼 transformation。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

28

還是和之前的邏輯一樣,跟一下 GroupAggFunction 的邏輯。如下圖,有五個執行步驟執行計算。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

33

再看最終生成的 function 代碼邏輯。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

31

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

29

首先看看 count 怎麼算的。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

30

sum 怎麼算的。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

32

4.2.3.2.去重聚合場景

1.場景:去重聚合場景

資料源:

CREATE TABLE source_table (
    dim STRING,
    user_id BIGINT
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dim.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '1000000'
)
           

複制

資料彙:

CREATE TABLE sink_table (
    dim STRING,
    uv BIGINT
) WITH (
  'connector' = 'print'
)
           

複制

資料處理:

insert into sink_table
select dim,
       count(distinct user_id) as uv
from source_table
group by dim
           

複制

2.運作:

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

35

3.上面這個案例的結果:

+U[9, 3097]
-U[a, 3054]
+U[a, 3055]
-U[8, 3030]
+U[8, 3031]
-U[4, 3137]
+U[4, 3138]
-U[6, 3139]
+U[6, 3140]
-U[0, 3082]
+U[0, 3083]
           

複制

4.原理:

此處隻看和之前的案例不一樣的地方。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇
flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

4.2.3.3.文法糖

1.grouping sets

多元計算。相當于文法糖,使用者可以根據自己的場景去指定自己想要的次元組合。

資料彙:

CREATE TABLE sink_table (
    supplier_id STRING,
    product_id STRING,
    total BIGINT
) WITH (
  'connector' = 'print'
)
           

複制

資料處理邏輯:

insert into sink_table
SELECT
     supplier_id,
     product_id,
     COUNT(*) AS total
FROM (VALUES
     ('supplier1', 'product1', 4),
     ('supplier1', 'product2', 3),
     ('supplier2', 'product3', 3),
     ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, product_id), (supplier_id), ())
           

複制

其結果等同于:

insert into sink_table
SELECT
     supplier_id,
     product_id,
     COUNT(*) AS total
FROM (VALUES
     ('supplier1', 'product1', 4),
     ('supplier1', 'product2', 3),
     ('supplier2', 'product3', 3),
     ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY supplier_id, product_id
UNION ALL
SELECT
     supplier_id,
     cast(null as string) as product_id,
     COUNT(*) AS total
FROM (VALUES
     ('supplier1', 'product1', 4),
     ('supplier1', 'product2', 3),
     ('supplier2', 'product3', 3),
     ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY supplier_id
UNION ALL
SELECT
     cast(null as string) AS supplier_id,
     cast(null as string) AS product_id,
     COUNT(*) AS total
FROM (VALUES
     ('supplier1', 'product1', 4),
     ('supplier1', 'product2', 3),
     ('supplier2', 'product3', 3),
     ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
           

複制

結果如下:

+I[supplier1, product1, 1]
+I[supplier1, null, 1]
+I[null, null, 1]
+I[supplier1, product2, 1]
-U[supplier1, null, 1]
+U[supplier1, null, 2]
-U[null, null, 1]
+U[null, null, 2]
+I[supplier2, product3, 1]
+I[supplier2, null, 1]
-U[null, null, 2]
+U[null, null, 3]
+I[supplier2, product4, 1]
-U[supplier2, null, 1]
+U[supplier2, null, 2]
-U[null, null, 3]
+U[null, null, 4]
           

複制

grouping sets 能幫助我們在多元場景下,減少很多備援代碼。關于 grouping sets 原理後面的系列文章會介紹。

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

38

2.rollup

rollup 是上卷計算的一種簡化寫法。比如可以把

GROUPING SETS ((supplier_id, product_id), (supplier_id), ())

簡化為

ROLLUP (supplier_id, product_id)

資料彙:

CREATE TABLE sink_table (
    supplier_id STRING,
    product_id STRING,
    total BIGINT
) WITH (
  'connector' = 'print'
)
           

複制

資料處理邏輯:

SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, product_id)
           

複制

其結果等同于:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id ),
    ( supplier_id             ),
    (                         )
)
           

複制

結果如下:

+I[supplier1, product1, 1]
+I[supplier1, null, 1]
+I[null, null, 1]
+I[supplier1, product2, 1]
-U[supplier1, null, 1]
+U[supplier1, null, 2]
-U[null, null, 1]
+U[null, null, 2]
+I[supplier2, product3, 1]
+I[supplier2, null, 1]
-U[null, null, 2]
+U[null, null, 3]
+I[supplier2, product4, 1]
-U[supplier2, null, 1]
+U[supplier2, null, 2]
-U[null, null, 3]
+U[null, null, 4]
           

複制

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

39

5.CUBE 計算

源碼公衆号背景回複不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧擷取。

cube 相當于是一種覆寫了所有次元組合聚合計算。比如 group by a, b, c。其會将 a, b, c 三個次元的所有次元組合進行 group by。

資料彙:

CREATE TABLE sink_table (
    supplier_id STRING,
    product_id STRING,
    total BIGINT
) WITH (
  'connector' = 'print'
)
           

複制

資料處理邏輯:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, product_id)
           

複制

它等同于

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id ),
    ( supplier_id             ),
    (              product_id ),
    (                         )
)
           

複制

結果如下:

+I[supplier1, product1, 1]
+I[supplier1, null, 1]
+I[null, product1, 1]
+I[null, null, 1]
+I[supplier1, product2, 1]
-U[supplier1, null, 1]
+U[supplier1, null, 2]
+I[null, product2, 1]
-U[null, null, 1]
+U[null, null, 2]
+I[supplier2, product3, 1]
+I[supplier2, null, 1]
+I[null, product3, 1]
-U[null, null, 2]
+U[null, null, 3]
+I[supplier2, product4, 1]
-U[supplier2, null, 1]
+U[supplier2, null, 2]
+I[null, product4, 1]
-U[null, null, 3]
+U[null, null, 4]
           

複制

flink sql 知其是以然(七):不會連最适合 flink sql 的 ETL 和 group agg 場景都沒見過吧?1.序篇-本文結構2.背景篇-大家不了解 flink sql 什麼?3.目标篇-本文能幫助大家了解 flink sql 什麼?4.實戰篇-簡單的 query 案例和運作原理5.總結與展望篇

40

5.總結與展望篇

本文主要介紹了 ETL,group agg 聚合類名額的一些常見場景案例以及其底層運作原理。我們可以發現 flink sql 的文法其實和 hive sql,mysql 啥的文法都是基本一緻的。是以上手 flink sql 時,文法基本不會成為我們的障礙。

而且也介紹了在檢視 flink sql 任務時的一些技巧:

  1. 去 flink webui 看看這個任務目前在做什麼。包括算子名稱都會給直接展示給我們目前哪個算子在幹啥事情,在處理啥邏輯。
  2. 如果你想知道你的 flink 任務執行了什麼代碼,就去看看 sql 最後轉換成的 transformation 裡面具體要執行哪些操作。

後續文章會繼續介紹 flink sql 視窗聚合,一些了解誤區,和坑之類的案例。