摘要:本文由阿裡巴巴技術專家賀小令(曉令)分享,主要介紹 Apache Flink 新場景 OLAP 引擎,内容分為以下四部分:
- 背景介紹
- Flink OLAP 引擎
- 案例介紹
- 未來計劃
一、背景介紹
1.OLAP 及其分類

OLAP 是一種讓使用者可以用從不同視角友善快捷的分析資料的計算方法。主流的 OLAP 可以分為3類:多元 OLAP ( Multi-dimensional OLAP )、關系型 OLAP ( Relational OLAP ) 和混合 OLAP ( Hybrid OLAP ) 三大類。
(1)多元 OLAP ( MOLAP )
傳統的 OLAP 分析方式
資料存儲在多元資料集中
(2)關系型 OLAP ( ROLAP )
以關系資料庫為核心,以關系型結構進行多元資料的表示
通過 SQL 的 where 條件以呈現傳統 OLAP 的切片、切塊功能
(3)混合 OLAP ( HOLAP )
将 MOLAP 和 ROLPA 的優勢結合起來,以獲得更快的性能
以下将詳細介紹每種分類的具體特征。
■ 多元 OLAP ( MOLAP )
MOLAP 的典型代表是 Kylin 和 Druid。
- MOLAP 處理流程
首先,對原始資料做資料預處理;然後,将預處理後的資料存至資料倉庫,使用者的請求通過 OLAP server 即可查詢資料倉庫中的資料。
- MOLAP 的優點和缺點
MOLAP 的優點和缺點都來自于其資料預處理 ( pre-processing ) 環節。資料預處理,将原始資料按照指定的計算規則預先做聚合計算,這樣避免了查詢過程中出現大量的臨時計算,提升了查詢性能,同時也為很多複雜的計算提供了支援。
但是這樣的預聚合處理,需要預先定義次元,會限制後期資料查詢的靈活性;如果查詢工作涉及新的名額,需要重新增加預處理流程,損失了靈活度,存儲成本也很高;同時,這種方式不支援明細資料的查詢。
是以,MOLAP 适用于對性能要求非常高的場景。
■ 關系型 OLAP ( ROLAP )
ROLAP 的典型代表是 Presto 和 Impala。
- 處理流程
ROLAP 的處理流程上,使用者的請求直接發送給 OLAP server,然後 OLAP server 将使用者的請求轉換成關系型操作算子,再通過 SCAN 掃描原始資料,在原始資料基礎上做過濾、聚合、關聯等處理,最後将計算結果傳回給使用者。
- ROLAP 的優點和缺點
ROLAP 不需要進行資料預處理 ( pre-processing ),是以查詢靈活,可擴充性好。這類引擎使用 MPP 架構 ( 與Hadoop相似的大型并行處理架構,可以通過擴大并發來增加計算資源 ),可以高效處理大量資料。
但是當資料量較大或 query 較為複雜時,查詢性能也無法像 MOLAP 那樣穩定。所有計算都是臨時發生 ( 沒有預處理 ),是以會耗費更多的計算資源。
是以,ROLAP 适用于對查詢靈活性高的場景。
■ 混合 OLAP ( HOLAP )
混合 OLAP,是 MOLAP 和 ROLAP 的一種融合。當查詢聚合性資料的時候,使用MOLAP 技術;當查詢明細資料時,使用 ROLAP 技術。在給定使用場景的前提下,以達到查詢性能的最優化。
2.Apache Flink 介紹
■ Flink 支援的應用場景
Apache Flink 支援的3種典型應用場景:
(1)事件驅動的應用
- 反欺詐
- 基于規則的監控報警
(2)流式 Pipeline
- 資料 ETL
- 實時搜尋引擎的索引
(3)批處理 & 流處理分析
- 網絡品質監控
- 消費者實時資料分析
■ Flink 架構及優勢
Flink 的整體架構如上圖所示,在此架構下,Flink 的優勢也十分突出,主要分為6個方面:
(1)統一架構 ( 不區分流處理和批處理 )
- 使用者 API 統一
- 執行引擎統一
(2)多層次 API
- 标準 SQL APL
- Table API
- DataStream API ( 靈活,無 schema 限制 )
(3)高性能
- 支援記憶體計算
- 支援代價模型優化
- 支援代碼動态生成
(4)友善內建
- 支援豐富的 Connectors
- 友善對接現有 Catalog
(5)靈活的 Failover 政策
- 在 Pipeline 下支援快速 failover
- 類似 MapReduce、Spark 一樣支援 shuffle 資料落盤
(6)易部署維護
- 靈活部署方案
- 支援高可用
二、Apache Flink OLAP 引擎
1.為什麼 Flink 可以做 ROLAP 引擎?
- Flink 的核心和基礎是流計算,支援高性能、低延遲的大規模計算。
- Blink 将批看作有限流,批處理是針對有限資料集的優化,是以批處理引擎也是建構在流引擎上 ( 已開源 )。
- OLAP 是響應時間要求更短的批處理,是以 OLAP 可以看作是一種特殊的批。OLAP 引擎也可以建構在現有的批引擎上。
注:Flink OLAP 引擎目前不帶存儲,隻是一個計算架構。
2.Flink 做 OLAP 引擎的優勢
(1)統一引擎:流處理、批處理、OLAP 統一使用 Flink 引擎。
降低學習成本,僅需要學習一個引擎
提高開發效率,很多 SQL 是流批通用
提高維護效率,可以更集中維護好一個引擎
(2)既有優勢:利用 Flink 已有的很多特性,使 OLAP 使用場景更為廣泛。
使用流處理的記憶體計算、Pipeline
也可以支援批處理資料落盤能力
(3)互相增強:OLAP 能享有現有引擎的優勢,同時也能增強引擎能力
- 無統計資訊場景的優化
- 開發更高效的算子
- 使 Flink 同時兼備流、批、OLAP 處理的能力,成為更通用的架構
3.性能優化
OLAP 對查詢時間非常敏感,目前很多元件的性能不滿足要求,是以我們對 Flink 做了很多相關優化。
■ 服務架構的優化
- 用戶端服務化
下圖介紹了一條 SQL 怎麼在用戶端一步一步變為 JobGraph,最終送出給 JM:
在改動之前,每次接受一個 query 時會啟動一個新的 JVM 程序來進行作業的編譯。其中 JVM 的啟動、Class 的加載、代碼的動态編譯 ( 如 Optimizer 子產品由于需要通過 Janino 動态編譯進行 cost 計算 ) 等操作都非常耗時 ( 需要約3~5s )。是以,我們将用戶端進行服務化,将整個 Client 做成 Service,當接收到使用者的 query 時,無需重複各項加載工作,可将延時降低至 100ms 左右。
- 自定義 CollectionTableSink
Flink 新場景:OLAP 引擎性能優化及應用案例
這部分優化,源于 OLAP 的一個特性:OLAP 會将最終計算結果發給用戶端,通過JobManager 轉發給 Client。假如某個 query 的結果資料量很大,會讓 JobManager OOM ( OutOfMemory );如果同時執行多個 query,也會互相影響。
是以,我們從新實作了一個 CollectionTableSink,限制資料的條數和資料大小,避免出現 OOM,保證多個 Query 同時運作時的穩定性。
- 排程優化
在 Batch 模式下的排程存在以下問題:
- 使用 Lazy_from_sources 模式排程,會導緻整體運作時間較長,也可能造成死鎖。
- RM ( Resource Manager ) 按 OnDemand 方式配置設定 Slot 需求,也會造成死鎖。
- RM 以單線程同步模式向 TM ( Transaction Manager ) 配置設定 Slot 請求,會造成等待時間更長。
注:排程死鎖是指在資源有限的情況下,多個 Job 同時運作時,如果多個 Job都隻申請到了部分資源并沒有剩餘資源可以申請,導緻 Job 沒法繼續執行,新的 Job 也沒法送出。
針對上述問題,我們提出了以下幾點改動:
- 采用 Eager 排程模式 ( 確定所有的資源都申請到後才開始運作 )。
- 使用 FIFO ( 先進先出隊 ) 模式申請資源 ( 確定目前 Job 的資源配置設定結束後才開始下一個 Job 的資源配置設定 )。
- 将單線程同步模式改為多線程異步模式,減少任務啟動時間和執行時間。
■ 針對 source 的優化
在 ROLAP 的執行場景中,所有資料都是通過掃描原始資料表後進行處理;是以,基于 Source 的讀取性能非常關鍵,直接影響 Job 的執行效率。
- Project&Filter 下堆
像 Parquet 這類的列存檔案格式,支援按需讀取相所需列,同時支援 RowGroup 級别的過濾。利用該特性,可以将 Project 和 Filter 下推到 TableSource,進而隻需要掃描 Query 中涉及的字段和滿足條件的 RowGroup,大大提升讀取效率。
- Aggregate 下堆
這個優化也是充分利用了 TableSource 的特性:例如 Parquet 檔案的 metadata 中已經存儲了每個 RowGroup 的統計資訊 ( 如 max、min等 ),是以在做 max、min 這類聚合統計時,可直接讀取 metadata 資訊,而不需要先讀取所有原始資料再計算。
■ 在沒有統計資訊場景下做的優化
- 消除 CrossJoin
CrossJoin 是沒有任何 Join 條件,将 Join 的兩張表的資料做笛卡爾積,導緻 Join 的結果膨脹非常厲害,這類 Join 應該盡量避免。我們對含有 CrossJoin 的 Plan 進行改寫:将有 join 條件的表格先做 join ( 通常會因為一些資料 Join 不上而減少資料 ),進而提高執行效率。這是一個确定性的改寫,即使在沒有統計資訊的情況下,也可以使用該優化。
- 自适應的 Local Aggregate
通常情況下,兩階段的 Aggregate 是非常高效的,因為 LocalAggregate 能聚合大量資料,導緻 Shuffle 的資料量會變少。但是當 LocalAggregate 的聚合度很低的時候, Local 聚合操作的意義不大,反而會浪費 CPU。
在沒有任何統計資訊的情況下,優化器沒法決定是否要産生 LocalAggregate 算子;是以,我們采用運作時采樣的方式來判斷聚合度,如果聚合度低于設定的門檻值,我們将關閉聚合操作,改為僅做資料轉發;經我們測試,部分場景有 30% 的性能提升。
4.測試結果
上圖是 Flink 和 Presto基于 1T 資料做的 SSB ( Star Schema Benchmark ) 測試,從圖中可以看出 Flink 和 Presto 整體上不相上下,甚至有些 Query Flink 性能優于Presto。
注:Flink OLAP 從開始到嘉賓分享時,隻有3個月時間。
1.Flink OLAP 在資料探查上的應用
上圖描述了一個資料湖應用的完整架構,Flink OLAP 主要用于"資料探查"。
資料探查是對資料結構做智能判斷,給出資料的探查結果,快速了解資料的資訊和品質情況。即使用者可以在管控平台上了解資料湖中任意一份資料的資料特性。使用者通過 Web 互動操作選擇相應的表和名額後立即展示相關結果名額,是以要求低延遲、實時回報。而且資料湖中很多資料沒有任何統計資訊;前述的各種查詢、聚合層面的優化,主要為這類場景服務。
2.整體架構
上圖是這類應用的整體架構。整套服務托管到 Kubernetes 上,最終通路的資料是OSS。
目前,Flink OLAP 引擎性能優化及應用主要是基于内部 Flink,後續工作主要分為以下三塊:
- 推回社群:目前所有工作都是基于内部 Flink,希望推回社群;
- 資源隔離:後期很多功能的開發和優化會圍繞多 Query 運作時的"資源隔離";
- 優化&性能:圍繞 OLAP 的特性,在此場景下會進一步做優化和性能提升等方面的工作。