作者 | Robin
翻譯 | 周凱波
Contentsquare 公司的 Robin 總結了他們将 Spark 任務遷移到 Flink 遇到的 10 個『陷阱』。對于第一次将 Flink 用于生産環境的使用者來說,這些經驗非常有參考意義。
采用新的架構總是會帶來很多驚喜。當你花了幾天時間去排查為什麼服務運作異常,結果發現隻是因為某個功能的用法不對或者缺少一些簡單的配置。
在 Contentsquare[1],我們需要不斷更新資料處理任務,以滿足越來越多的資料上的苛刻需求。這也是為什麼我們決定将用于會話[2]處理的小時級 Spark 任務遷移到 Flink[3] 流服務。這樣我們就可以利用 Flink 更為健壯的處理能力,提供更實時的資料給使用者,并能提供曆史資料。不過這并不輕松,我們的團隊在上面工作了有一年時間。同時,我們也遇到了一些令人驚訝的問題,本文将嘗試幫助你避免這些陷阱。
1. 并行度設定導緻的負載傾斜
我們從一個簡單的問題開始:在 Flink UI 中調查某個作業的子任務時,關于每個子任務處理的資料量,你可能會遇到如下這種奇怪的情況。

每個子任務的工作負載并不均衡
這表明每個子任務的算子沒有收到相同數量的 Key Groups,它代表所有可能的 key 的一部分。如果一個算子收到了 1 個 Key Group,而另外一個算子收到了 2 個,則第二個子任務很可能需要完成兩倍的工作。檢視 Flink 的代碼,我們可以找到以下函數:
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
其目的是将所有 Key Groups 分發給實際的算子。Key Groups 的總數由 maxParallelism 參數決定,而算子的數量和 parallelism 相同。這裡最大的問題是 maxParallelism 的預設值,它預設等于 operatorParallelism + (operatorParallelism / 2) [4]。假如我們設定 parallelism 為10,那麼 maxParallelism 為 15 (實際最大并發度值的下限是 128 ,上限是 32768,這裡隻是為了友善舉例)。這樣,根據上面的函數,我們可以計算出哪些算子會配置設定給哪些 Key Group。
在預設配置下,部分算子配置設定了兩個 Key Group,部分算子隻配置設定了 1 個
解決這個問題非常容易:設定并發度的時候,還要為 maxParallelism 設定一個值,且該值為 parallelism 的倍數。這将讓負載更加均衡,同時友善以後擴充。
2. 注意 mapWithState & TTL 的重要性
在處理包含無限多鍵的資料時,要考慮到 keyed 狀态保留政策(通過 TTL 定時器來在給定的時間之後清理未使用的資料)是很重要的。術語『無限』在這裡有點誤導,因為如果你要處理的 key 以 128 位編碼,則 key 的最大數量将會有個限制(等于 2 的 128 次方)。但這是一個巨大的數字!你可能無法在狀态中存儲那麼多值,是以最好考慮你的鍵空間是無界的,同時新鍵會随着時間不斷出現。
如果你的 keyed 狀态包含在某個 Flink 的預設視窗中,則将是安全的:即使未使用 TTL,在處理視窗的元素時也會注冊一個清除計時器,該計時器将調用 clearAllState 函數,并删除與該視窗關聯的狀态及其中繼資料。
如果要使用 Keyed State Descriptor [5]來管理狀态,可以很友善地添加 TTL 配置,以確定在狀态中的鍵數量不會無限制地增加。
但是,你可能會想使用更簡便的 mapWithState 方法,該方法可讓你通路 valueState 并隐藏操作的複雜性。雖然這對于測試和少量鍵的資料來說是很好的選擇,但如果在生産環境中遇到無限多鍵值時,會引發問題。由于狀态是對你隐藏的,是以你無法設定 TTL,并且預設情況下未配置任何 TTL。這就是為什麼值得考慮做一些額外工作的原因,如聲明諸如 RichMapFunction 之類的東西,這将使你能更好的控制狀态的生命周期。
3. 從檢查點還原和重新分區
在使用大狀态時,有必要使用增量檢查點(incremental checkpointing)。在我們的案例中,任務的完整狀态約為 8TB,我們将檢查點配置為每 15 分鐘做一次。由于檢查點是增量式的,是以我們隻能設法每 15 分鐘将大約 100GB 的資料發送到對象存儲,這是一種更快的方式并且網絡占用較少。這對于容錯效果很好,但是在更新任務時我們也需要檢索狀态。常用的方法是為正在運作的作業建立一個儲存點(savepoint),以可移植的格式包含整個狀态。
但是,在我們的情況下,儲存點可能需要幾個小時才能完成,這使得每次釋出版本都是一個漫長而麻煩的過程。相反,我們決定使用保留檢查點(Retained Checkpoints[6])。設定此參數後,我們可以通過從上一個作業的檢查點恢複狀态來加快釋出速度,而不必觸發冗長的儲存點!
此外,盡管儲存點比檢查點具有更高的可移植性,但您仍然可以使用保留的檢查點來更改作業的分區(它可能不适用于所有類型的作業,是以最好對其進行測試)。這與從儲存點重新分區完全一樣,但是不需要經曆 Flink 在 TaskManager 之間重新配置設定資料的漫長過程。當我們嘗試這樣做時,大約花了 8 個小時才完成,這是不可持續的。幸運的是,由于我們使用的是 RocksDB 狀态後端,是以我們可以在這步中增加更多線程以提高其速度。這是通過将以下兩個參數從 1 增加到 8 來完成的:
state.backend.rocksdb.checkpoint.transfer.thread.num:8
state.backend.rocksdb.thread.num:8
使用保留的檢查點,并增加配置設定給 RocksDB 傳輸的線程數,能将釋出和重新分區時間減少 10 倍!
4. 提前增加日志記錄
這一點可能看起來很明顯,但也很容易忘記。開發作業時,請記住它将運作很長時間,并且可能會處理意外的資料。發生這種情況時,你将需要盡可能多的資訊來調查發生了什麼,而不必通過再次回溯相同的資料來重制問題。
我們的任務是将事件彙總在一起,并根據特定規則進行合并。這些規則中的某些規則在大多數情況下性能還可以,但是當有資料傾斜時卻要花費很長時間。當我們發現任務卡住了 3 個小時,卻不知道它在做什麼。似乎隻有一個 TaskManager 的 CPU 可以正常工作,是以我們懷疑是特定資料導緻我們的算法性能下降。
最終處理完資料後,一切恢複正常,但是我們不知道從哪開始檢查!這就是為什麼我們為這些情況添加了一些預防性日志的原因:在處理視窗時,我們會測量花費的時間。隻要計算視窗所需的時間超過 1 分鐘,我們就會記錄下所有可能的資料。這對于準确了解導緻性能下降的傾斜是非常有幫助的,并且當再次發生這種情況時,我們能夠定位到合并過程處理慢的部分原因。假如收到的是重複的資料,則可能确實需要幾個小時。當然,重要的是不要過多地記錄資訊,因為這會降低性能。是以,請嘗試找到僅在異常情況下才顯示資訊的門檻值。
5. 如何找出卡住的作業實際上在做什麼
對上述問題的調查也使我們意識到,我們需要找到一種簡單的方法,來定位作業疑似卡住時目前正在運作的代碼段。幸運的是,有一個簡單的方法可以做到這一點!首先,您将需要配置 TaskManagers 的 JMX 以接受遠端監視。在 Kubernetes 部署中,我們可以通過三個步驟連接配接到 JMX:
- 首先,将此屬性添加到我們的 flink-conf.yaml 中
env.java.opts: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099 -Djava.rmi.server.hostname=127.0.0.1"
- 然後,将本地端口 1099 轉發到 TaskManager 的 pod 中的端口
kubectl port-forward flink-taskmanager-4 1099
- 最後,打開 jconsole
jconsole 127.0.0.1:1099
這使您可以輕松地在 JVM 上檢視目标 TaskManager 的資訊。對于卡住的作業,我們以正在運作的唯一一個 TaskManager 為目标,分析了正在運作的線程:
JConsole 向我們展示了每個線程目前正在做什麼
深入研究,我們可以看到所有線程都在等待,除了其中一個(在上面的螢幕截圖中已突出顯示)。這使得我們能夠快速發現作業是卡在哪個方法調用裡面的,并輕松修複!
6. 将資料從一種狀态遷移到另一種狀态的風險
根據你的實際情況,可能需要保留兩個具有不同語義的不同狀态描述符。例如,我們通過 WindowContent 狀态為進行中的會話累積事件,接着将處理後的會話移動到稱為 HistoricalSessions 的 ValueState 中。第二個狀态為了防止後面需要用到會保留幾天,直到 TTL 過期丢棄它為止。
我們做的第一個測試運作良好:我們可以發送額外的資料到已處理的會話,這将為相同的鍵建立一個新視窗。在視窗的處理過程中,我們會從 HistoricalSessions 狀态中擷取資料,以将新資料與舊會話合并,并且結果會話是曆史會話的增強版本,這也正是我們所期望的。
在執行此操作時,我們遇到過幾次記憶體問題。經過幾次測試後,我們了解到 OOM 僅在将舊資料發送到 Flink 時才發生(即,發送資料的時間戳早于其目前水印)。這使得我們發現了目前處理方式中的一個大問題:當接收到舊資料時,Flink 将其與舊視窗合并,而舊視窗的資料仍在 WindowContent 狀态内(這可以通過設定 AllowedLateness 實作)。然後結果視窗會與 HistoricalSessions 内容合并,該内容還包含舊的資料。最終我們得到的是重複的事件,在同一會話中收到一些事件後,每個事件都将有數千條重複,進而導緻了 OOM。
這個問題的解法非常簡單:我們希望 WindowContent 在将其内容移至第二個狀态之前自動清除。我們使用了 Flink 的 PurgingTrigger 來達到這個目的,當視窗觸發時,該消息會發送一條清除狀态内容的消息。具體代碼如下所示:
// Purging the window's content allows us to receive late events without merging them twice with the old session
val sessionWindows = keyedStream
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.allowedLateness(Time.days(7))
.trigger(PurgingTrigger.of(EventTimeTrigger.create()))
7. Reduce VS Process
如上所述,我們對 Flink 的使用依賴于累積給定鍵的資料,并将所有這些資料合并在一起。這可以通過兩種方式完成:
- 将資料存儲在 ListState 容器中,等待會話結束,并在會話結束時将所有資料合并在一起
- 使用 ReducingState 在每個新事件到達時,将其與之前的事件合并
使用第一種還是第二種狀态取決于你在 WindowedStream 上運作的功能:使用 ProcessWindowFunction 的 process 調用将使用 ListState,而使用 ReduceFunction 的 reduce 調用則将使用 ReducingState。
ReducingState 的優點非常明顯:不存儲視窗處理之前的所有資料,而是在單個記錄中不斷地對其進行聚合。這通常會導緻狀态更小,取決于在 reduce 操作期間會丢棄多少資料。對我們來說,它在存儲方面幾乎沒有改善,因為與我們為曆史會話存儲的 7 天資料相比,該狀态的大小可以忽略不計。相反,我們注意到通過使用 ListState 可以提高性能!
原因是:每次新事件到來時,連續的 reduce 操作都需要對資料進行反序列化和序列化。這可以在 RocksDBReducingState[7] 的 add 函數中看到,該函數會調用 getInternal[8],進而導緻資料反序列化。
但是,當使用 RocksDB 更新 ListState 中的值,我們可以看到沒有序列化發生[9]。這要歸功于 RocksDB 的合并操作能讓 Flink 可以将資料進行追加而無需反序列化。
最後,我們選擇了 ListState 方法,因為性能提升有助于減少延遲,而存儲的影響卻很小。
8. 不要相信輸入資料!
永遠不要假設你的輸入會像你期望的那樣。可能會出現各種未知的情況,比如你的任務接收到了傾斜的資料、重複的資料、意外的峰值、無效的記錄……總是往最壞的方面想,保護你的作業免受這些影響。
讓我們快速定義幾個關鍵術語,供後面使用:
- “網頁浏覽(PV)事件”是我們接收到的主要資訊。當通路者在用戶端加載 URL 以及 userId、sessionNumber 和 pageNumber 等資訊時,就會觸發它
- “會話”代表使用者在不離開網站的情況下進行的所有互動的總和。它們是由 Flink 通過彙總 PV 事件和其他資訊計算得出的
為了保護我們的任務,我們已盡可能的增加前置過濾。我們必須遵守的規則是,盡可能早地在流中過濾掉無效資料,以避免在中後期造成不必要的昂貴操作。例如,我們有一個規則,對于給定的會話,最多隻能發送 300 個 PV 事件。每個 PV 事件都用一個遞增的頁碼标記,以訓示其在會話中的位置。當我們在一個會話中接收到超過 300 個 PV 事件時,我們可以通過以下方法來過濾它們:
- 計算一個給定視窗過期時的 PV 事件的數量
- 丢棄頁碼超過 300 的事件
第一個方案似乎更可靠,因為它不依賴于頁碼的值,但是我們要在狀态中累積 300 多個 PV 事件,然後才能排除它們。最終我們選擇了第二個方案,該方案在錯誤資料進入 Flink 時就進行了排除。
除了這些無狀态過濾器之外,我們還需要根據與每個鍵相關的名額排除資料。例如,每個會話的最大大小(以位元組為機關)設定為 4MB。選擇此數字是出于業務原因,也是為了幫助解決 Flink 中 RocksDB 狀态的一個限制。事實上,如果 Flink 使用的 RocksDB API 的值超過 2 ^ 31 位元組[10],那麼它就會失敗。是以,如果你像上面解釋的那樣使用一個 ListState,則需要確定你永遠不要累積太多的資料。
當你隻有關于新消費的事件的資訊時,就不可能知道會話的目前大小,這意味着我們不能使用與處理頁碼相同的技巧。我們所做的隻是将 RocksDB 中的每個鍵(即每個會話)的中繼資料存儲在一個單獨的 ValueState 中。此中繼資料在 keyBy 算子之後,但在開窗之前使用和更新。這意味着我們可以保護 RocksDB 避免在其 ListState 中積累太多資料,因為基于此中繼資料,我們知道何時停止接受給定鍵的值!
9. 事件時間的危險性
事件時間處理在大多數情況下都很出色,但你必須牢記:如果你處理晚到資料的方法很費時,可能會有一些糟糕的後果。這個問題并不是直接與 Flink 有關,當某個外部元件往 Kafka topic 在寫資料,而同時Flink正在消費這個 topic 的資料,如果這個外部元件出現問題,就會發生資料晚到的現象。具體來說,當這個元件消費某些分區的速度比其他元件慢時。
這個元件(稱為 Asimov)是一個簡單的 Akka 流程式,該程式讀取 Kafka topic,解析 JSON 資料,将其轉換為 protobuf,然後将其推送到另一個 Kafka topic,這樣Flink就可以處理這個 protobuf。Asimov 的輸入在每個分區中應該是有序的,但是由于分區不是與輸出 topic 一對一映射,是以當Flink最終處理消息時,可能會出現一些亂序。這樣也沒啥問題,因為 Flink 能通過延遲水印來支援亂序。
問題是,當 Asimov 讀取一個分區的速度比其他分區慢時:這意味着 Flink 的水印将随着最快的 Asimov 輸入分區(而不是 Flink 的輸入,因為所有分區都正常前進)前進,而慢的分區将發出具有更舊時間戳的記錄。這最終會導緻 Flink 将這些記錄視為遲來的記錄! 這可能沒問題,但是在我們的作業中,我們使用特定的邏輯來處理晚到的記錄,需要從 RocksDB 擷取資料并生成額外的消息來執行下遊的更新。這意味着,每當 Asimov 因為某種原因在幾個分區上落後時,Flink 就需要做更多的工作。
在有 128 個分區的 topic 中,隻有 8 個分區累積延遲,進而導緻 Flink 中的資料晚到
我們發現了兩種解決此問題的方法:
- 我們可以按照與它的輸出 topic 相同的方式(通過 userId)對 Asimov 的輸 入 topic 進行分區。這意味着,當 Asimov 滞後幾個分區,Flink輸入中的相應分區也滞後,進而導緻水印前進得更慢:
我們決定不這樣做,因為如果我們在 Asimov 之前就有晚到的資料,這個問題仍然會存在,這迫使我們得以相同的方式來給每個 topic 劃分分區。但這在很多情況下是不能做的。
- 另一個解決方案依賴于攢批處理晚到的事件:如果我們可以推遲對晚到事件的處理,我們可以確定每個會話最多産生一個更新,而不是每個事件産生一個更新。
我們可以通過使用自定義觸發器,以避免出現晚到事件到達時就觸發視窗,進而實作第二種解決方案。正如你在預設的 EventTimeTrigger 實作中所看到的,晚到事件在特定情況下不會注冊計時器。在我們的方案中,無論如何我們都會注冊一個計時器,并且不會立即觸發視窗。因為我們的業務需求允許以這種方式進行批量更新,是以我們可以確定當上遊出現延遲時,我們不會生成數百個昂貴的更新。
10. 避免将所有内容存儲在 Flink 中
讓我們以一些普遍的觀點來結束我們的讨論:如果你的資料很大,并且不需要經常通路,那麼最好将其存儲在 Flink 之外。在設計作業時,你希望所有需要的資料都直接在 Flink 節點上(在 RocksDB 或記憶體中)可用。當然,這使得使用這種資料的方式更快,但當資料很大時,它會給你的作業增加很多成本。這是因為 Flink 的狀态沒有被複制,是以丢失一個節點需要從檢查點完全恢複。如果你經常需要向檢查點存儲寫入數百 GB 資料,則檢查點機制本身也很昂貴。
如果對狀态的通路是性能需求中的關鍵部分,那麼将它存儲在Flink中絕對是值得的。但是,如果你可以忍受額外的延遲,那麼将它存儲在具有複制功能和支援對給定記錄快速通路的外部資料庫中,這将為你節省很多麻煩。對于我們的用例,我們選擇将 WindowContent 狀态保留在 RocksDB 中,但我們将 HistoricalSessions 資料移入了 Aerospike[11]中。由于狀态較小,這使得我們的 Flink 作業更快,更容易維護。我們甚至還受益于這樣一個事實:存儲在 Flink 中的剩餘資料足夠小,可以都放入記憶體,這讓我們無需使用 RocksDB 和本地 SSD。
結尾
總而言之,使用 Flink 是一次很棒的經曆:盡管有時我們無法了解架構的行為,但最終它總是有道理的。我強烈推薦訂閱 Flink 使用者郵件清單[12],從這個非常有用和友好的社群獲得額外的提示!
更多參考連結,請移步作者原文:
https://engineering.contentsquare.com/2021/ten-flink-gotchas/