天天看點

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

前言

Apache Spark 自 2010 年面世,到現在已經發展為大資料批計算的首選引擎。而在 2020 年 6 月份釋出的Spark 3.0 版本也是 Spark 有史以來最大的 Release,其中将近一半的 issue 都屬于 SparkSQL。這也迎合我們現在的主要場景(90% 是 SQL),同時也是優化痛點和主要功能點。我們 Erda 的 FDP 平台(Fast Data Platform)也從 Spark 2.4 更新到 Spark 3.0 并做了一系列的相關優化,本文将主要結合 Spark 3.0 版本進行探讨研究。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

為什麼 Spark 3.0 能夠“神功大成”,在速度和性能方面有質的突破?本文就為大家介紹 Spark 3.0 中 SQL Engine 的“天榜第一”——自适應查詢架構 AQE(Adaptive Query Execution)。

AQE,你是誰?

簡單來說,自适應查詢就是在運作時不斷優化執行邏輯。

Spark 3.0 版本之前,Spark 執行 SQL 是先确定 shuffle 分區數或者選擇 Join 政策後,再按規劃執行,過程中不夠靈活;現在,在執行完部分的查詢後,Spark 利用收集到結果的統計資訊再對查詢規劃重新進行優化。這個優化的過程不是一次性的,而是随着查詢會不斷進行優化, 讓整個查詢優化變得更加靈活和自适應。這一改動讓我們告别之前無休止的被動優化。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

AQE,你會啥?

了解了 AQE 是什麼之後,我們再看看自适應查詢 AQE 的“三闆斧”:

  • 動态合并 Shuffle 分區
  • 動态調整 Join 政策
  • 動态優化資料傾斜

動态合并 shuffle 分區

如果你之前使用過 Spark,也許某些“調優寶典”會告訴你調整 shuffle 的 partitions 數量,預設是 200。但是在不同 shuffle 中,資料的大小和分布基本都是不同的,那麼簡單地用一個配置,讓所有的 shuffle 來遵循,顯然不是最優的。

分區過小會導緻每個 partition 處理的資料較大,可能需要将資料溢寫到磁盤,進而減慢查詢速度;分區過大又會帶來 GC 壓力和低效 I/O 等問題。是以,動态合并 shuffle 分區是非常必要的。AQE 可以在運作期間動态調整分區數來達到性能最優。

如下圖所示,如果沒有 AQE,shuffle 分區數為 5,對應執行的 Task 數為 5,但是其中有三個的資料量很少,任務配置設定不平衡,浪費了資源,降低了處理效率。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

而 AQE 會合并三個小分區,最終隻執行三個 Task,這樣就不會出現之前 Task 空轉的資源浪費情況。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

動态調整 join 政策

SparkJoin 政策大緻可以分三種,分别是 Broadcast Hash Join、Shuffle Hash Join 和 SortMerge Join。其中 Broadcast 通常是性能最好的,Spark 會在執行前選擇合适的 Join 政策。

例如下面兩個表的大小分别為 100 MB 和 30 MB,小表超過 10 MB (spark.sql.autoBroadcastJoinThreshold = 10 MB),是以在 Spark 2.4 中,執行前就選擇了 SortMerge Join 的政策,但是這個方案并沒有考慮 Table2 經過條件過濾之後的大小實際隻有 8 MB。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

AQE 可以基于運作期間的統計資訊,将 SortMerge Join 轉換為 Broadcast Hash Join。

在上圖中,Table2 經過條件過濾後真正參與 Join 的資料隻有 8 MB,是以 Broadcast Hash Join 政策更優,Spark 3.0 會及時選擇适合的 Join 政策來提高查詢性能。

資料傾斜一直是我們資料進行中的常見問題。當将相同 key 的資料拉取到一個 Task 中處理時,如果某個 key 對應的資料量特别大的話,就會發生資料傾斜,如下圖一樣産生長尾任務導緻整個 Stage 耗時增加甚至 OOM。之前的解決方法比如重寫 query 或者增加 key 消除資料分布不均,都非常浪費時間且後期難以維護。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

AQE 可以檢查分區資料是否傾斜,如果分區資料過大,就将其分隔成更小的分區,通過分而治之來提升總體性能。

沒有 AQE 傾斜優化時,當某個 shuffle 分區的資料量明顯高于其他分區,會産生長尾 Task,因為整個 Stage 的結束時間是按它的最後一個 Task 完成時間計算,下一個 Stage 隻能等待,這會明顯降低查詢性能。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

開啟 AQE 後,會将 A0 分成三個子分區,并将對應的 B0 複制三份,優化後将有 6 個 Task 運作 Join,且每個 Task 耗時差不多,進而獲得總體更好的性能。通過對傾斜資料的自适應重分區,解決了傾斜分區導緻的整個任務的性能瓶頸,提高了查詢處理效率。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

自适應查詢 AQE 憑借着自己的“三闆斧”,在 1TB TPC-DS 基準中,可以将 q77 的查詢速度提高 8 倍,q5 的查詢速度提高 2 倍,且對另外 26 個查詢的速度提高 1.1 倍以上,這是普通優化無法想象的傲人戰績!

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

真的嗎?我不信

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

口說無憑,自适應查詢 AQE 的優越性到底是如何實作,我們“碼”上看看。

AQE 參數說明

#AQE開關
spark.sql.adaptive.enabled=true #預設false,為true時開啟自适應查詢,在運作過程中基于統計資訊重新優化查詢計劃
spark.sql.adaptive.forceApply=true #預設false,自适應查詢在沒有shuffle或子查詢時将不适用,設定為true将始終使用
spark.sql.adaptive.advisoryPartitionSizeInBytes=64M #預設64MB,開啟自适應執行後每個分區的大小。合并小分區和分割傾斜分區都會用到這個參數

#開啟合并shuffle分區
spark.sql.adaptive.coalescePartitions.enabled=true #當spark.sql.adaptive.enabled也開啟時,合并相鄰的shuffle分區,避免産生過多小task
spark.sql.adaptive.coalescePartitions.initialPartitionNum=200 #合并之前shuffle分區數的初始值,預設值是spark.sql.shuffle.partitions,可設定高一些
spark.sql.adaptive.coalescePartitions.minPartitionNum=20 #合并後的最小shuffle分區數。預設值是Spark叢集的預設并行性
spark.sql.adaptive.maxNumPostShufflePartitions=500 #reduce分區最大值,預設500,可根據資源調整

#開啟動态調整Join政策
spark.sql.adaptive.join.enabled=true #與spark.sql.adaptive.enabled都開啟的話,開啟AQE動态調整Join政策

#開啟優化資料傾斜
spark.sql.adaptive.skewJoin.enabled=true #與spark.sql.adaptive.enabled都開啟的話,開啟AQE動态處理Join時資料傾斜
spark.sql.adaptive.skewedPartitionMaxSplits=5 #處理一個傾斜Partition的task個數上限,預設值為5;
spark.sql.adaptive.skewedPartitionRowCountThreshold=1000000 #傾斜Partition的行數下限,即行數低于該值的Partition不會被當作傾斜,預設值一千萬
spark.sql.adaptive.skewedPartitionSizeThreshold=64M #傾斜Partition的大小下限,即大小小于該值的Partition不會被當做傾斜,預設值64M
spark.sql.adaptive.skewedPartitionFactor=5 #傾斜因子,預設為5。判斷是否為傾斜的 Partition。如果一個分區(DataSize>64M*5) || (DataNum>(1000w*5)),則視為傾斜分區。
spark.shuffle.statistics.verbose=true #預設false,打開後MapStatus會采集每個partition條數資訊,用于傾斜處理

           

AQE 功能示範

Spark 3.0 預設未開啟 AQE 特性,樣例 sql 執行耗時 41 s。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

存在 Task 空轉情況,shuffle 分區數始終為預設的 200。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

開啟 AQE 相關配置項,再次執行樣例 sql。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

樣例 sql 執行耗時 18 s,快了一倍以上。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

并且每個 Stage 的分區數動态調整,而不是固定的 200。無 task 空轉情況,在 DAG 圖中也能觀察到特性開啟。

「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結
「Spark從精通到重新入門(一)」Spark 中不可不知的動态優化前言AQE,你是誰?AQE,你會啥?真的嗎?我不信總結

總結

Spark 3.0 在速度和性能方面得提升有目共睹,它的新特性遠不止自适應查詢一個,當然也不意味着所有的場景都能有明顯的性能提升,還需要我們結合業務和資料進行探索和使用。

注:文中部分圖檔源自于網絡,侵删。