天天看點

Fink SQL 實踐之OVER視窗

Fink SQL 實踐之OVER視窗

問題場景

Flink SQL 是一種使用 SQL 語義設計的開發語言,用它解決具體業務需求是一種全新體驗,類似于從過程式程式設計到函數式程式設計的轉變一樣,需要一個不斷學習和實踐的過程。在看完了 Flink 官方文檔中 SQL 部分 ,以及官方提供的 SQL Training 後,覺得自己裝備了必殺技準備橫掃需求了,這時先來一個簡單的營銷需求:實時計算今天使用者加頁面次元的浏覽次數,即實時輸出PV,下遊根據某些規則比如今天使用者浏覽A頁面達到N次觸發某種動作。當我使用 group by user 并輸出到 kafka 中時,發生異常: AppendStreamTableSink requires that Table has only insert changes,這是因為不帶時間視窗的分組聚合不支援流式輸出,依賴 Flink Dynamic Tables 的實作原理。修改後加上時間視窗但是不支援實時觸發,隻在視窗結束時輸出一次結果,總之仔細翻了個遍,發現上面資料中都沒有類似場景,不會這麼簡單都實作不了吧?

首先使用标準 SQL 中的 group by 對資料進行分組時,每個分組隻能輸出一行,其中可以有聚合結果和 group by 的列,如果沒有開啟 ONLY_FULL_GROUP_BY 模式(MySQL 5.7 開始預設開啟),還可以輸出任意一行值的非 group by 的列,而 Flink 則是始終開啟,不支援非聚合列輸出。反過來如果我們想要在聚合計算後輸出所有明細,即每行明細帶上聚合結果,這時隻能使用 OVER視窗和它的分組 Partition(還有一個思路是聚合結果和明細再關聯查詢)。

此外在 Flink 中提供了一系列時間視窗函數,如滾動視窗 Tumble、滑動視窗 Hop、會話視窗 Session 等,可以将時間視窗作為 group by 分組項之一,但是正如前面所說它們都隻能在視窗結束時觸發計算輸出,無法滿足實時觸發需求,這點也隻能通過 OVER 視窗實作。

OVER 視窗介紹

OVER 視窗是傳統資料庫的視窗函數,它定義了一行記錄向前多少行或向後多少行作為一個視窗,以此範圍進行分組 Partion 和聚合計算。初次接觸 OVER 視窗覺得不好了解,但實際上它是 SQL 标準支援的特性,包括 Oracle、MySQL、PostgreSQL 等主流資料庫都已支援,下面是來自 PostgreSQL 文檔 3.5. Window Functions 中的介紹:

A window function performs a calculation across a set of table rows that are somehow related to the current row. This is comparable to the type of calculation that can be done with an aggregate function. However, window functions do not cause rows to become grouped into a single output row like non-window aggregate calls would. Instead, the rows retain their separate identities. Behind the scenes, the window function is able to access more than just the current row of the query result.

其中有具體 SQL 例子輔助了解,此外在維基百科有個更簡短介紹 SQL window function ,有了以上基礎後再來看 Flink SQL 中的 OVER 視窗,官方文檔對 OVER 視窗隻有幾行描述,SQL Training 中甚至沒有涉及,這裡我們可以參考阿裡雲實時計算文檔 OVER視窗 ,為了解釋後面例子,這裡我們隻關注其中 Bounded RANGE OVER 類型:具有相同時間值(時間戳毫秒數)的元素視為同一計算行,它對應一個向前有限時間範圍的行到該行的一個視窗(因為每一個新到的計算行是該視窗最後一個,以後的還沒發生當然無法計算),每一個視窗都會觸發一次計算和輸出。

OVER 視窗應用示例

首先通過 DDL 定義源資料表和結果表,如下輸入是使用者行為消息,輸出到計算結果消息。

CREATE TABLE

user_action

(

`user_id` VARCHAR,
`page_id` VARCHAR,
`action_type` VARCHAR,
`event_time` TIMESTAMP,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND           

) WITH (

'connector.type' = 'kafka',
'connector.topic' = 'user_action',
'connector.version' = '0.11',
'connector.properties.0.key' = 'bootstrap.servers',
'connector.properties.0.value' = 'xxx:9092',
'connector.startup-mode' = 'latest-offset',
'update-mode' = 'append',
'...' = '...'           

);

agg_result

`user_id` VARCHAR,
`page_id` VARCHAR,
`result_type` VARCHAR,
`result_value` BIGINT           
'connector.type' = 'kafka',
'connector.topic' = 'agg_result',
'...' = '...'           

場景一,實時觸發的最近2小時使用者+頁面次元的點選量,注意視窗是向前2小時,類似于實時觸發的滑動視窗。

insert into

agg_result           

select

user_id,
page_id,
'click-type1' as result_type
count(1) OVER (
    PARTITION BY user_id, page_id
    ORDER BY event_time 
    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW
) as result_value           

from

user_action           

where

action_type = 'click'           

場景二,實時觸發的當天使用者+頁面次元的浏覽量,這就是開篇問題解法,其中多了一個日期次元分組條件,這樣就做到輸出結果從滑動時間轉為固定時間(根據時間區間分組),因為 WATERMARK 機制,今天并不會有昨天資料到來(如果有都被自動抛棄),是以隻會輸出今天的分組結果。

agg_result           
user_id,
page_id,
'view-type1' as result_type
count(1) OVER (
    PARTITION BY user_id, page_id, DATE_FORMAT(event_time, 'yyyyMMdd')
    ORDER BY event_time 
    RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
) as result_value           
user_action           
action_type = 'view'           

場景三,實時觸發的當天使用者+頁面點選率 CTR(Click-Through-Rate),這相比前面增加了多個 OVER 聚合計算,可以将視窗定義寫在最後。注意示例中缺少了類型轉換,因為除法結果是 decimal,也缺少精度處理函數 ROUND。

agg_result           
user_id,
page_id,
'ctr-type1' as result_type,
sum(
    case
        when action_type = 'click' then 1 else 0
    end
) OVER w 
/ 
if(
    sum(
        case
            when action_type = 'view' then 1 else 0
        end
    ) OVER w = 0,
    1,
    sum(
        case
            when action_type = 'view' then 1 else 0
        end
    ) OVER w
)
as result_value           
user_action           
1 = 1            

WINDOW w AS (

PARTITION BY user_id, page_id, DATE_FORMAT(event_time,'yyyyMMdd')
    ORDER BY event_time 
    RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
)           

此外,OVER 視窗聚合還可以支援查詢子句、關聯查詢、UNION ALL 等組合,并可以實作對關聯出來的列進行聚合等複雜情況。

OVER 視窗問題和優化

在底層實作中,所有細分 OVER 視窗的資料都是共享的,隻存一份,這點不像滑動視窗會儲存多份視窗資料。但是 OVER 視窗會把所有資料明細存在狀态後端中(記憶體、RocksDB 或 HDFS),每一次視窗計算後會清除過期資料。是以如果向前視窗時間較大,或資料明細過多,可能會占用大量記憶體,即使通過 RocksDB 存在磁盤上,也有因為磁盤通路慢導緻性能下降進而産生反壓問題。在實作源碼 RowTimeRangeBoundedPrecedingFunction 可以看到雖然每次視窗計算時新增聚合值和減少過期聚合值是增量式的,不用周遊全部視窗明細,但是為了計算過期資料,即超過 PRECEDING 的資料,仍然需要把存儲的那些時間戳全部拿出來周遊,判斷是否過期,以及是否要減少聚合值。我們嘗試了通過資料有序性減少查詢操作,但是效果并不明顯,目前主要是配置調優和加大任務分片數進行優化。

原文位址

https://www.cnblogs.com/pyx0/p/flink-sql-over.html