天天看點

Spark 3.0 終于支援 event logs 滾動了

背景

相信經常使用 Spark 的同學肯定知道 Spark 支援将作業的 event log 儲存到持久化裝置。預設這個功能是關閉的,不過我們可以通過 spark.eventLog.enabled 參數來啟用這個功能,并且通過 spark.eventLog.dir 參數來指定 event log 儲存的地方,可以是本地目錄或者 HDFS 上的目錄,不過一般我們都會将它設定成 HDFS 上的一個目錄。

但是這個功能有個問題,就是這個 Spark Job 運作的過程中産生的所有 event log 都是寫到單個檔案中,這就導緻了 event log 檔案的大小和這個 Spark Job 的并行度、複雜度以及運作的時間有很大關系。如果我們是運作 Spark Streaming 作業,這個問題特别明顯,我們經常看到某個 Spark Streaming 作業的 event log 達到幾十 GB 大小!我們沒辦法清理或者删除一些不需要的事件日志,當我們使用 Spark 曆史伺服器打開這個幾十 GB 大小的 event log,打開速度可想而知。

如果大家經常使用 Log4j 的話,Log4j 提供了一個 RollingFileAppender,可以使長時間運作應用的日志按照時間或者日志檔案大小進行切割,進而達到限制單個日志檔案的大小。Spark 的 event log 為什麼不可以提供類似功能呢?值得高興的是,即将釋出的 Spark 3.0 為我們帶來了這個功能(具體參見 SPARK-28594)。當然,對待 Spark 的 event log 不能像其他普通應用程式的日志那樣,簡單切割,然後删除很早之前的日志,而需要保證 Spark 的曆史伺服器能夠解析已經 Roll 出來的日志,并且在 Spark UI 中展示出來,以便我們進行一些查錯、調優等。

如何使用

事件日志滾動

首先必須使用 Spark 3.0,同時将 spark.eventLog.rolling.enabled 設定為 true(預設是 false)。那麼 Spark 在 writeEvent 的時候會判斷目前在寫的 event log 檔案大小加上現在新來的事件日志大小總和是否大于 spark.eventLog.rolling.maxFileSize 參數配置的值,如果滿足将啟動 event log roll 操作。

事件日志壓縮

所謂事件日志壓縮就是将多個滾動出來的事件日志檔案合并到一個壓縮的檔案中。日志壓縮涉及到的參數有 spark.history.fs.eventLog.rolling.maxFilesToRetain 和 spark.history.fs.eventLog.rolling.compaction.score.threshold。第一個參數的意思是進行 Compaction 之後需要儲存多少個 event logs 為不壓縮的狀态,這個參數的預設值是 Int.MaxValue。也就是預設其實不啟用事件日志 Compaction,所有 event logs 都将不會被 Compaction 到一個檔案裡面。

需要注意的:

•event logs 的 Compaction 操作是在 Spark 曆史伺服器端進行的,而且是在 Spark 曆史伺服器檢查到有新的事件日志寫到 spark.eventLog.dir 參數配置的目錄中,這時候對應 Spark 作業的 event logs 将可能進行 compact 操作。

•event logs 的 Compaction 操作可能會删除一些沒用的事件日志,關于删除的邏輯請看下一小結。這樣經過 Compaction 操作之後,新生成的壓縮檔案大小将會變小。

•一個 Spark 作業最多隻會有一個 Compact 檔案,檔案的字尾是 .compact。已經有 compact 之後的合并檔案在下一次進行 compact 的時候會被讀出來和需要被 compact 的檔案再一次合并,然後寫到新的 compact 檔案裡。

•已經被選中進行 compact 的 event logs 在執行完 compact 之後會被删除。

核心思想

整個 event logs 滾動項目應該可以大緻分為兩個階段:

•第一個階段就是支援 event logs 滾動以及 event logs Compaction,這個在 SPARK-28594 裡面,已經合并到 Spark 3.0 代碼中。

•第二個階段是采用 AppStatusListener 使用的方法,即把 event logs 持久化到底層的 KVStore 中,并支援從 KVStore 把 event logs restore 出來,這個可以參見 SPARK-28870,這個還在開發中。

階段一

支援 event log 滾動的隐層含義是支援删除舊的事件日志,要不然光支援滾動不支援删除,隻是解決了單個 event log 檔案的大小,解決不了整個作業 event log 總和大小。

為了保證删除舊的事件日志之後 event logs 仍然可以被 Spark 曆史伺服器重放,我們需要定義出哪些事件日志是可以删除的。

拿 Streaming 作業來說,每個批次都是運作不同的作業。如果我們想删除一些事件日志,在大多數情況下,我們都會儲存最近一些批次作業的事件日志,因為這些事件日志有助于我們分析剛剛遇到的問題。換句話說,删除那些比較舊的作業對應的事件日志是比較安全的,而且是比較可行的。這個在 SQL 查詢的作業來說一樣是适用的。

目前 Spark 在記憶體中會維護一些 liveExecutors、liveRDDs、liveJobs、liveStages 以及 liveTasks 等資訊,當 Spark 曆史伺服器觸發 compact 操作的時候,會讀取需要 compact 的事件日志檔案, 然後根據前面的 liveExecutors、liveRDDs、liveJobs、liveStages 以及 liveTasks 等資訊判斷哪些事件需要删除,哪些事件需要保留。滿足 EventFilter 定義的 Event 會被保留,不滿足的就删除,具體可以參見 EventFilter 的 applyFilterToFile 方法實作。

階段二

AppStatusListener 會利用外部 KVStore 來存儲事件日志,是以社群建議利用現有特性在底層 KVStore 中保留最多數量的 Jobs、Stages 以及 SQL 執行。為了存儲對象到 KVStore 以及從 KVStore 恢複對象,社群采用的方法是将 KVStore 中存儲的對象 dump 到一個檔案中,這個稱為 snapshot。

從空間使用的角度來看,這個想法非常有效,因為在 POC 中,隻需 5MB 記憶體就可以将 KVStore 中的資料 dump 到檔案中,其中回放了8.4GB 的事件日志。結果看起來很令人驚訝,但是很有意義,根據這個機制,在大多數情況下,dump 資料到檔案需要的記憶體大小不太可能發生顯著變化。

需要注意的是,快照裡面的内容與目前事件日志檔案的内容是不同的。因為這個快照檔案是從 KVStore dump 出來的,這些對象不會按建立的順序寫入。我們可以壓縮這些對象以節省空間和 IO 成本。在分析某個問題時,新産生的事件日志可能沒什麼用,這就需要讀取和操作之前的事件日志檔案。為了支援這種情況,需要将基本的 listener events 編寫為原始格式,然後滾動事件日志檔案,然後再将舊的事件日志儲存到快照中,兩種格式的檔案共存。這樣就滿足我們之前的需求。由于快照的存在,事件日志的總體大小不會無限增長。

新的方案中 event log 是如何存儲的呢

之前每個 Spark 作業的 event log 都是儲存在單個檔案裡面,如果事件日志沒有完成,會使用 .inprogress 字尾表示。新的 event log 方案會為每個 Spark 作業建立一個目錄來儲存,因為每個 Spark 作業可能會生成多個事件日志檔案。事件日志的檔案夾名稱格式為:eventlog_v2_appId(_)。在事件日志檔案夾裡面存儲的是對應作業的事件日志,日志檔案名稱格式為:events__(_)(.)。

為了說明,我這裡進行了一些測試,/data/iteblog/eventlogs 這個目錄就是 spark.eventLog.dir 參數設定的值,下面是這個目錄下的内容:

[email protected]:/data/iteblog/eventlogs|
⇒  ll
total 0
drwxrwx---  15 iteblog  wheel   480B  3  9 14:26 eventlog_v2_local-1583735123583
drwxrwx---   7 iteblog  wheel   224B  3  9 14:50 eventlog_v2_local-1583735259373
​
[email protected]:/data/iteblog/eventlogs/eventlog_v2_local-1583735259373|
⇒  ll
total 416
-rw-r--r--  1 iteblog  wheel     0B  3  9 14:27 appstatus_local-1583735259373.inprogress
-rwxrwx---  1 iteblog  wheel    64K  3  9 14:50 events_2_local-1583735259373.compact
-rwxrwx---  1 iteblog  wheel   102K  3  9 14:50 events_3_local-1583735259373
-rwxrwx---  1 iteblog  wheel   374B  3  9 14:50 events_4_local-1583735259373
​           

可以看到,當 Spark 作業還沒有完成的時候,會存在一個 appstatus_local-1583735259373.inprogress 的空檔案,真正的事件日志是寫到 events_x_local-1583735259373 檔案裡面。

這裡再多說一下,Spark 還使用 HDFS 的 EC (erasure coding,參見過往記憶大資料的

Hadoop 3.0 糾删碼(Erasure Coding):節省一半存儲空間

功能來進一步節省 event log 的大小。不過從 Spark 3.0 開始,預設寫 event log 不啟用 EC,原因是 HDFS EC 的 hflush() 或 hsync() 實作是不做任何操作的,也就意味着如果你的應用程式不完成,那麼你在磁盤是看不到資料的。不過你如果實在需要 EC 功能,在 Spark 3.0 可以通過 spark.eventLog.allowErasureCoding 參數啟用,具體參見 SPARK-25855

本文轉載自公衆号:過往記憶大資料

原文連結:

https://mp.weixin.qq.com/s/Qb_g66MQMUopyVSfnhU-qw

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

Spark 3.0 終于支援 event logs 滾動了

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

Spark 3.0 終于支援 event logs 滾動了

Apache Spark技術交流社群公衆号,微信掃一掃關注

Spark 3.0 終于支援 event logs 滾動了