1.前言
本文主要是整理部落客收集的 Flink 高頻面試題。
其中主要劃分為一下 4 大主題,首先是前兩個 狀态原理、時間視窗 是用于考核候選人對于 Flink 基本原理的了解,程式設計技巧、實戰經驗 主要是考核候選人使用 Flink 的經驗。
1
内容較多,并且比較詳細,建議擷取 PDF 資料,公衆号加部落客微信擷取。歡迎大家指正錯誤與不足之處,也歡迎大家繼續補充此内容。
2.狀态原理
2.1.狀态、狀态後端、Checkpoint 三者之間的差別及關系?
結論:拿五個字做比喻:"鐵鍋炖大鵝",鐵鍋是狀态後端,大鵝是狀态,Checkpoint 是炖的動作。
- ⭐ 狀态:本質來說就是資料,在 Flink 中,其實就是 Flink 提供給使用者的狀态程式設計接口。比如 flink 中的 MapState,ValueState,ListState。
- ⭐ 狀态後端:Flink 提供的用于管理狀态的元件,狀态後端決定了以什麼樣資料結構,什麼樣的存儲方式去存儲和管理我們的狀态。Flink 目前官方提供了 memory、filesystem,rocksdb 三種狀态後端來存儲我們的狀态。
- ⭐ Checkpoint(狀态管理):Flink 提供的用于定時将狀态後端中存儲的狀态同步到遠端的存儲系統的元件或者能力。為了防止 long run 的 Flink 任務挂了導緻狀态丢失,産生資料品質問題,Flink 提供了狀态管理(Checkpoint,Savepoint)的能力把我們使用的狀态給管理起來,定時的儲存到遠端。然後可以在 Flink 任務 failover 時,從遠端把狀态資料恢複到 Flink 任務中,保障資料品質。
2.2.把狀态後端從 FileSystem 變為 RocksDB 後,Flink 任務狀态存儲會發生那些變化?
結論:是否使用 RocksDB 隻會影響 Flink 任務中 keyed-state 存儲的方式和地方,Flink 任務中的 operator-state 不會受到影響。
首先我們來看看,Flink 中的狀态隻會分為兩類:
- ⭐ keyed-state:鍵值狀态,如其名字,此類狀态是以 k-v 的形式存儲,狀态值和 key 綁定。Flink 中的 keyby 之後緊跟的算子的 state 就是鍵值狀态;
- ⭐ operator-state:算子狀态,非 keyed-state 的 state 都是算子狀态,非 k-v 結構,狀态值和算子綁定,不和 key 綁定。Flink 中的 kafka source 算子中用于存儲 kafka offset 的 state 就是算子狀态。
如下圖所示是 3 種狀态後端和 2 種 State 的對應存儲關系:
2
- ⭐ 橫向(行)來看,即 Flink 的狀态分類。分為 Operator state-backend、Keyed state-backend;
- ⭐ 縱向(列)來看,即 Flink 的狀态後端分類。使用者可以配置 memory,filesystem,rocksdb 3 中狀态後端,在 Flink 任務中生成 MemoryStateBackend,FsStateBackend,RocksdbStateBackend,其聲明了整個任務的狀态管理後端類型;
- ⭐ 每個格子中的内容就是使用者在配置 xx 狀态後端(列)時,給使用者使用的狀态(行)生成的狀态後端執行個體,生成的這個執行個體就是在 Flink 中實際用于管理使用者使用的狀态的元件。
是以對應的結論就是:
- ⭐ Flink 任務中的 operator-state。無論使用者配置哪種狀态後端(無論是 memory,filesystem,rocksdb),都是使用 DefaultOperatorStateBackend 來管理的,狀态資料都存儲在記憶體中,做 Checkpoint 時同步到遠端檔案存儲中(比如 HDFS)。
- ⭐ Flink 任務中的 keyed-state。使用者在配置 rocksdb 時,會使用 RocksdbKeyedStateBackend 去管理狀态;使用者在配置 memory,filesystem 時,會使用 HeapKeyedStateBackend 去管理狀态。是以就有了這個問題的結論,配置 rocksdb 隻會影響 keyed-state 存儲的方式和地方,operator-state 不會受到影響。
2.3.什麼樣的業務場景你會選擇 filesystem,什麼樣的業務場景你會選 rocksdb 狀态後端?
在回答這個問題前,我們先看看每種狀态後端的特性:
- ⭐ MemoryStateBackend
- 原理:運作時所需的 State 資料全部儲存在 TaskManager JVM 堆上記憶體中,執行 Checkpoint 的時候,會把 State 的快照資料儲存到 JobManager 程序 的記憶體中。執行 Savepoint 時,可以把 State 存儲到檔案系統中。
- 适用場景:
- 基于記憶體的 StateBackend 在生産環境下不建議使用,因為 State 大小超過 JobManager 記憶體就 OOM 了,此種狀态後端适合在本地開發調試測試,生産環境基本不用。
- State 存儲在 JobManager 的記憶體中。受限于 JobManager 的記憶體大小。
- 每個 State 預設 5MB,可通過 MemoryStateBackend 構造函數調整。d.每個 Stale 不能超過 Akka Frame 大小。
- ⭐ FSStateBackend
- 原理:運作時所需的 State 資料全部儲存在 TaskManager 的記憶體中,執行 Checkpoint 的時候,會把 State 的快照資料儲存到配置的檔案系統中。TM 是異步将 State 資料寫入外部存儲。
- 适用場景:
-
a.适用于處理小狀态、短視窗、或者小鍵值狀态的有狀态處理任務,不建議在大狀态的任務下使用 FSStateBackend。比如 ETL 任務,小時間間隔的 TUMBLE 視窗
b.State 大小不能超過 TM 記憶體。
- ⭐ RocksDBStateBackend
- 原理:使用嵌入式的本地資料庫 RocksDB 将流計算資料狀态存儲在本地磁盤中。在執行 Checkpoint 的時候,會将整個 RocksDB 中儲存的 State 資料全量或者增量持久化到配置的檔案系統中。
- 适用場景:
- a.最适合用于處理大狀态、長視窗,或大鍵值狀态的有狀态處理任務。
- b.RocksDBStateBackend 是目前唯一支援增量檢查點的後端。
- c.增量檢查點非常适用于超大狀态的場景。比如計算 DAU 這種大資料量去重,大狀态的任務都建議直接使用 RocksDB 狀态後端。
到生産環境中:
- ⭐ 如果狀态很大,使用 Rocksdb;如果狀态不大,使用 Filesystem。
- ⭐ Rocksdb 使用磁盤存儲 State,是以會涉及到通路 State 磁盤序列化、反序列化,性能會收到影響,而 Filesystem 直接通路記憶體,單純從通路狀态的性能來說 Filesystem 遠遠好于 Rocksdb。生産環境中實測,相同任務使用 Filesystem 性能為 Rocksdb 的 n 倍,是以需要根據具體場景評估選擇。
2.4.Flink SQL API State TTL 的過期機制是 onCreateAndUpdate 還是onReadAndWrite
- ⭐ 結論:Flink SQL API State TTL 的過期機制目前隻支援 onCreateAndUpdate,DataStream API 兩個都支援
3
- ⭐ 剖析:
- onCreateAndUpdate:是在建立 State 和更新 State 時【更新 State TTL】
- onReadAndWrite:是在通路 State 和寫入 State 時【更新 State TTL】
- ⭐ 實際踩坑場景:Flink SQL Deduplicate 寫法,row_number partition by user_id order by proctime asc,此 SQL 最後生成的算子隻會在第一條資料來的時候更新 state,後續通路不會更新 state TTL,是以 state 會在使用者設定的 state TTL 時間之後過期。
3.時間視窗
3.1.watermark 到底是幹啥的?應用場景?
大部分同學都隻能回答出:watermark 是用于緩解時間時間的亂序問題的。
沒錯,這個觀點是正确的。但是部落客認為這隻是 watermark 第二重要的作用,其更重要的作用在于可以辨別一個 Flink 任務的事件 時間進度。
怎麼了解 時間進度?
我們可以現象一下,一個事件時間視窗的任務,如果沒有一個 東西 去辨別其事件時間的進度,那麼這個事件時間的視窗也就是不知道什麼時候能夠觸發了,也就是說這個視窗永遠不會觸發并且輸出結果。
是以要有一個 東西 去辨別其事件時間的進度,進而讓這個事件時間視窗知道,這個事件時間視窗已經結束了,可以觸發計算了。在 Flink 中,這個 東西 就是 watermark。
總結一下,部落客認為 watermark 為 Flink 解決了兩個問題:
- ⭐ 辨別 Flink 任務的事件時間進度,進而能夠推動事件時間視窗的觸發、計算。
- ⭐ 解決事件時間視窗的亂序問題。
3.2.一個 Flink 任務中可以既有事件時間視窗,又有處理時間視窗嗎?
結論:一個 Flink 任務可以同時有事件時間視窗,又有處理時間視窗。
那麼有些小夥伴們問了,為什麼我們常見的 Flink 任務要麼設定為事件時間語義,要麼設定為處理時間語義?
确實,在生産環境中,我們的 Flink 任務一般不會同時擁有兩種時間語義的視窗。
那麼怎麼解釋開頭部落客所說的結論呢?
部落客這裡從兩個角度進行說明:
- ⭐ 我們其實沒有必要把一個 Flink 任務和某種特定的時間語義進行綁定。對于事件時間視窗來說,我們隻要給它 watermark,能讓 watermark 一直往前推進,讓事件時間視窗能夠持續觸發計算就行。對于處理時間來說更簡單,隻要視窗算子按照本地時間按照固定的時間間隔進行觸發就行。無論哪種時間視窗,主要滿足時間視窗的觸發條件就行。
- ⭐ Flink 的實作上來說也是支援的。Flink 是使用一個叫做 TimerService 的元件來管理 timer 的,我們可以同時注冊事件時間和處理時間的 timer,Flink 會自行判斷 timer 是否滿足觸發條件,如果是,則回調視窗處理函數進行計算。
3.3.window 後面跟 aggregate 和 process 的兩個視窗計算的差別是什麼?
- ⭐ aggregate:是增量聚合,來一條資料計算完了存儲在累加器中,不需要等到視窗觸發時計算,性能較好;
- ⭐ process:全量函數,緩存全部視窗内的資料,滿足視窗觸發條件再觸發計算,同時還提供定時觸發,視窗資訊等上下文資訊;
- ⭐ 應用場景:aggregate 一個一個處理的聚合結果向後傳遞一般來說都是有資訊損失的,而 process 則可以更加定制化的處理。
4.程式設計技巧
4.1.為什麼 Flink DataStream API 在函數入參或者出參有泛型時,不能使用 lambda 表達式?
Flink 類型資訊系統是通過反射擷取到 Java class 的方法簽名去擷取類型資訊的。
以 FlatMap 為例,Flink 在通過反射時會檢查及擷取 FlatMap collector 的出參類型資訊。
但是 lambda 表達式寫的 FlatMap 邏輯,會導緻反射方法擷取類型資訊時【直接擷取不到】collector 的出參類型參數,是以才會報錯。
4.2.Flink 為什麼強調 function 實作時,執行個體化的變量要實作 serializable 接口?
其實這個問題可以延伸成 3 個問題:
- ⭐ 為什麼 Flink 要用到 Java 序列化機制。和 Flink 類型系統的資料序列化機制的用途有啥差別?
- ⭐ 非執行個體化的變量沒有實作 Serializable 為啥就不報錯,執行個體化就報錯?
- ⭐ 為啥加 transient 就不報錯?
上面 3 個問題的答案如下:
- ⭐ Flink 寫的函數式程式設計代碼或者說閉包,需要 Java 序列化從 JobManager 分發到 TaskManager,而 Flink 類型系統的資料序列化機制是為了分發資料,不是分發代碼,可以用非Java的序列化機制,比如 Kyro。
- ⭐ 編譯期不做序列化,是以不實作 Serializable 不會報錯,但是運作期會執行序列化動作,沒實作 Serializable 接口的就報錯了
- ⭐ Flink DataStream API 的 Function 作為閉包在網絡傳輸,必須采用 Java 序列化,是以要通過 Serializable 接口标記,根據 Java 序列化的規定,内部成員變量要麼都可序列化,要麼通過 transient 關鍵字跳過序列化,否則 Java 序列化的時候會報錯。靜态變量不參與序列化,是以不用加 transient。
4.3.Flink 的并行度可以通過哪幾種方式設定,優先級關系是什麼?
- ⭐ 代碼中算子單獨設定
- ⭐ 代碼中Env全局設定
- ⭐ 送出參數
- ⭐ 預設配置資訊
上面的 Flink 并行度優先級從上往下由大變小。
5.實戰經驗
5.1.Flink SQL 計算使用者分布
- ⭐ 需求:上遊是一個 kafka 資料源,資料内容是使用者 QQ 等級變化明細資料(time,uid,level)。需要你求出目前每個等級的使用者數。
- ⭐ 實作 SQL:Deduplicate
-- 如果需要可以打開 minibatch
select
level
, count(1) as uv
, max(time) as time
from (
select
uid
, level
, time
, row_number() over (partition by uid order by time desc) rn
from source
) tmp
where rn =1
group by
level
5.2.Flink SQL 計算 DAU
- ⭐ 需求:資料源:使用者心跳日志(uid,time,type)。計算分 Android,iOS 的 DAU,最晚一分鐘輸出一次當日零點累計到目前的結果。
- ⭐ 實作方式 1:cumulate 視窗
SELECT
window_start
, window_end
, platform
, sum(bucket_dau) as dau
from (
SELECT
window_start
, window_end
, platform
, count(distinct uid) as bucket_dau
FROM TABLE(
CUMULATE(
TABLE user_log,
DESCRIPTOR(time),
INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start
, window_end
, platform
, MOD(HASH_CODE(user_id), 1024)
) tmp
GROUP by
window_start
, window_end
, platform
- 優點:如果是曲線圖的需求,可以完美回溯曲線圖。
- 缺點:大視窗之間如果有資料亂序,有丢數風險;并且由于是 watermark 推動産出,是以資料産出會有延遲。
- ⭐ 實作方式 2:Deduplicate
-- 如果需要可以打開 minibatch
select
platform
, count(1) as dau
, max(time) as time
from (
select
uid
, platform
, time
, row_number() over (partition by uid, platform, time / 24 / 3600 / 1000 order by time desc) rn
from source
) tmp
where rn = 1
group by
platform
- 優點:計算快。
- 缺點:任務發生 failover,曲線圖不能很好回溯。沒法支援 cube 計算。
- ⭐ 實作方式 3:group agg
-- 如果需要可以打開 minibatch
SELECT
max(time) as time
, platform
, sum(bucket_dau) as dau
from (
SELECT
max(time) as time
, platform
, count(distinct uid) as bucket_dau
FROM source
GROUP BY
platform
, MOD(HASH_CODE(user_id), 1024)
) t
GROUP by
platform
- 優點:計算快,支援 cube 計算。
- 缺點:任務發生 failover,曲線圖不能很好回溯。
5.3.你是怎麼合理的評估 Flink 任務的并行度?
Flink 任務并行度合理行一般根據峰值流量進行壓測評估,并且根據叢集負載情況留一定量的 buffer 資源。
- ⭐ 如果資料源已經存在,則可以直接消費進行測試
- ⭐ 如果資料源不存在,需要自行造壓測資料進行測試
對于一個 Flink 任務來說,一般可以按照以下方式進行細粒度設定并行度:
- ⭐ source 并行度配置:以 kafka 為例,source 的并行度一般設定為 kafka 對應的 topic 的分區數
- ⭐ transform(比如 flatmap、map、filter 等算子)并行度的配置:這些算子一般不會做太重的操作,并行度可以和 source 保持一緻,使得算子之間可以做到 forward 傳輸資料,不經過網絡傳輸
- ⭐ keyby 之後的處理算子:建議最大并行度為此算子并行度的整數倍,這樣可以使每個算子上的 keyGroup 是相同的,進而使得資料相對均勻 shuffle 到下遊算子,如下圖為 shuffle 政策
- ⭐ sink 并行度的配置:sink 是資料流向下遊的地方,可以根據 sink 的資料量及下遊的服務抗壓能力進行評估。如果 sink 是 kafka,可以設為 kafka 對應 topic 的分區數。注意 sink 并行度最好和 kafka partition 成倍數關系,否則可能會出現如到 kafka partition 資料不均勻的情況。但是大多數情況下 sink 算子并行度不需要特别設定,隻需要和整個任務的并行度相同就行。
5.4.你是怎麼合理評估任務最大并行度?
- ⭐ 前提:并行度必須 <= 最大并行度
- ⭐ 最大并行度的作用:合理設定最大并行度可以緩解資料傾斜的問題
- ⭐ 根據具體場景的不同,最大并行度大小設定也有不同的方式:
- 在 key 非常多的情況下,最大并行度适合設定比較大(幾千),不容易出現資料傾斜,以 Flink SQL 場景舉例:row_number = 1 partition key user_id 的 Deduplicate 場景(user_id 一般都非常多)
- 在 key 不是很多的情況下,最大并行度适合設定不是很大,不然會加重資料傾斜,以 Flink SQL 場景舉例:group by dim1,dim2 聚合并且次元值不多的 group agg 場景(dim1,dim2 可以枚舉),如果依然有資料傾斜的問題,需要自己先打散資料,緩解資料傾斜
- ⭐ 最大并行度的使用限制:最大并行度一旦設定,是不能随意變更的,否則會導緻檢查點或儲存點失效;最大并行度設定會影響 MapState 狀态劃分的 KeyGroup 數,并行度修改後再從儲存點啟動時,KeyGroup 會根據并行度的設定進行重新分布。
- ⭐ 最大并行度的設定:最大并行度可以自己設定,也可以架構預設生成;預設的算法是取目前算子并行度的 1.5 倍和 2 的 7 次方比較,取兩者之間的最大值,然後用上面的結果和 2 的 15 次方比較,取其中的最小值為預設的最大并行度,非常不建議自動生成,建議使用者自己設定。
--END--