天天看點

Spark3自适應查詢計劃(Adaptive Query Execution,AQE)

動态合并shuffle分區(Dynamically coalescing shuffle partitions)

動态調整join政策(Dynamically switching join strategies)

動态優化資料傾斜join(Dynamically optimizing skew joins)

參數:spark.sql.adaptive.enabled 預設關閉,開啟此參數後上述三種政策才會執行

Spark3自适應查詢計劃(Adaptive Query Execution,AQE)

1、動态優化資料傾斜(Dynamically optimizing skew joins)

spark.sql.adaptive.skewJoin.enabled 預設 true

相關參數:

①、spark.sql.adaptive.skewJoin.skewedPartitionFactor 預設5

傾斜分區資料大小 > 整個RDD分區分區大小的中位數 * 此參數配置的值

Spark3自适應查詢計劃(Adaptive Query Execution,AQE)

②、spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 預設256MB

傾斜分區資料大小 > 此參數的預設值

Spark3自适應查詢計劃(Adaptive Query Execution,AQE)

同時滿足①、②這兩個條件參會判定此分區傾斜需要進行裁剪

③、spark.sql.adaptive.advisoryPartitionSizeInBytes 預設64MB

優化後的分區大小 = max(此參數, 非資料傾斜partition的平均大小)

Spark3自适應查詢計劃(Adaptive Query Execution,AQE)

在Reduce階段進行自動傾斜處理的拆分操作,在同一個Executor内部,本該由一個Task處理的大分區,被AQE拆成多個小分區并交由多個Task去計算,這樣可以解決Task之間的負載均衡。但解決不了不同Excuter之間的負載均衡。如果傾斜的分區都分到了一個Executor上,那麼這個Executor的計算能力還是整個作業的瓶頸。

如果左右兩邊的表都出現了資料傾斜現象,需要對左右兩張表的傾斜分區都進行拆分操作,左表拆分M各分區,右表拆分N各分區,那麼每張表最終需要保證M*N個分區才能保證邏輯關聯的一緻性。是以在極端情況下對拆分的分區拉取、複制所需要的開銷會不可控。

2、動态合并shuffle分區(Dynamically coalescing shuffle partitions)

spark.sql.adaptive.coalescePartitions.enabled 預設 true

優化類型:實體計劃 CoalesceShufflePartitions

統計資訊:每個Reduce Task分區大小

發生在Shuffle Map完成後的Reduce階段,Reduce Task将資料分片全部拉回,AQE按照分區編号的順序,依次把小于目标尺寸的分區合并到一起。目标分區尺寸由一下兩個參數決定

spark.sql.adaptive.advisoryPartitionSizeInBytes,預設64M。

spark.sql.adaptive.coalescePartitions.minPartitionNum,最小分區數,預設spark叢集的預設并行度。

最終的targetSize為:首先計算出總的shuffle的資料大小totalPostShuffleInputSize;

maxTargetSize為max(totalPostShuffleInputSize/minPartitionNum,16);targetSize=min(maxTargetSize,advisoryPartitionSizeInBytes)

3、動态調整join政策(Dynamically switching join strategies)

spark.sql.adaptive.localShuffleReader.enabled 預設true

優化類型:邏輯計劃 DemoteBroadcastHashJoin

實體計劃 OptimizeLocalShuffleReader

統計資訊:Map階段中間檔案總大小、中間檔案空檔案占比

DemoteBroadcastHashJoin:把Shuffle Joins降級為Broadcast Joins。僅适用于Shuffle Sort Merge Join。當兩張表完成Shuffle Map階段後,會繼續判斷某一張表是否滿足一下兩個條件

中間檔案尺寸總和小于廣播門檻值 spark.sql.autoBroadcastJoinThreshold(10M)

空檔案占比小于配置項 spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin(0.2)

隻要有一個表滿足就會降級

OptimizeLocalShuffleReader:因為AQE依賴的統計資訊來自于Shuffle Map階段生成的中間檔案,是以在AQE開始優化前,Shuffle操作已經執行過半。

兩張大表join,超過了廣播門檻值的話Spark SQL最初會選擇SortMerge Join,AQE隻有結合兩個表join中的Exchange才能進行降級判斷,是以兩張表必須都完成Map且中間檔案落盤。AQE才會決定是否降級以及用那張表做廣播變量

spark.sql.adaptive.localShuffleReader.enabled(true)完成省去Shuffle正常操作中的網絡分發,Reduce Task可以就讀取本地節點(local)的中間檔案,完成與廣播小表的關聯操作。

繼續閱讀