阿裡雲EMR-3.13.0版本的SparkSQL支援自适應執行功能。
解決哪些問題
SparkSQL自适應執行解決以下問題:
shuffle partition個數
目前SparkSQL中reduce階段的task個數取決于固定參數
spark.sql.shuffle.partition
(預設值200),一個作業一旦設定了該參數,它運作過程中的所有階段的reduce個數都是同一個值。
而對于不同的作業,以及同一個作業内的不同reduce階段,實際的資料量大小可能相差很大,比如reduce階段要處理的資料可能是10MB,也有可能是100GB, 如果使用同一個值對實際運作效率會産生很大影響,比如10MB的資料一個task就可以解決,如果
spark.sql.shuffle.partition
使用預設值200的話,那麼10MB的資料就要被分成200個task處理,增加了排程開銷,影響運作效率。
SparkSQL自适應架構可以通過設定shuffle partition的上下限區間,在這個區間内對不同作業不同階段的reduce個數進行動态調整。
通過區間的設定,一方面可以大大減少調優的成本(不需要找到一個固定值),另一方面同一個作業内部不同reduce階段的reduce個數也能動态調整。
參數:
屬性名稱 | 預設值 | 備注 |
---|---|---|
spark.sql.adaptive.enabled | false | 自适應執行架構的開關 |
spark.sql.adaptive.minNumPostShufflePartitions | 1 | reduce個數區間最小值 |
spark.sql.adaptive.maxNumPostShufflePartitions | 500 | reduce個數區間最大值 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 67108864 | 動态調整reduce個數的partition大小依據,如設定64MB則reduce階段每個task最少處理64MB的資料 |
spark.sql.adaptive.shuffle.targetPostShuffleRowCount | 20000000 | 動态調整reduce個數的partition條數依據,如設定20000000則reduce階段每個task最少處理20000000條的資料 |
資料傾斜
join中會經常碰到資料傾斜的場景,導緻某些task處理的資料過多,出現很嚴重的長尾。目前SparkSQL沒有對傾斜的資料進行相關的優化處理。
SparkSQL自适應架構可以根據預先的配置在作業運作過程中自動檢測是否出現傾斜,并對檢測到的傾斜進行優化處理。
優化的主要邏輯是對傾斜的partition進行拆分由多個task來進行處理,最後通過union進行結果合并。
支援的Join類型:
join類型 | |
---|---|
Inner | 左/右表均可處理傾斜 |
Cross | |
LeftSemi | 隻對左表處理傾斜 |
LeftAnti | |
LeftOuter | |
RightOuter | 隻對右表處理傾斜 |
spark.sql.adaptive.skewedJoin.enabled | 傾斜處理開關 | |
spark.sql.adaptive.skewedPartitionFactor | 10 | 當一個partition的size大小 大于 該值(所有parititon大小的中位數) 且 大于spark.sql.adaptive.skewedPartitionSizeThreshold,或者parition的條數 大于 該值(所有parititon條數的中位數) 且 大于 spark.sql.adaptive.skewedPartitionRowCountThreshold, 才會被當做傾斜的partition進行相應的處理 |
spark.sql.adaptive.skewedPartitionSizeThreshold | 傾斜的partition大小不能小于該值 | |
spark.sql.adaptive.skewedPartitionRowCountThreshold | 10000000 | 傾斜的partition條數不能小于該值 |
spark.shuffle.statistics.verbose | 打開後MapStatus會采集每個partition條數的資訊,用于傾斜處理 |
Runtime執行計劃優化
SparkSQL的Catalyst優化器會将sql語句轉換成實體執行計劃,然後真正運作實體執行計劃。但是Catalyst轉換實體執行計劃的過程中,由于缺少Statistics統計資訊,或者Statistics統計資訊不準等原因,會到時轉換的實體執行計劃可能并不是最優的,比如轉換為SortMergeJoinExec,但實際BroadcastJoin更合适。
SparkSQL自适應執行架構會在實體執行計劃真正運作的過程中,動态的根據shuffle階段shuffle write的實際資料大小,來調整是否可以用BroadcastJoin來代替SortMergeJoin,提高運作效率。
spark.sql.adaptive.join.enabled | true | 開關 |
spark.sql.adaptiveBroadcastJoinThreshold | 等于spark.sql.autoBroadcastJoinThreshold | 運作過程中用于判斷是否滿足BroadcastJoin條件 |
測試
以TPC-DS中某些query為例
query30
原生Spark:

自适應調整reduce個數:
Runtime執行計劃優化(SortMergeJoin轉BroadcastJoin)
自适應轉換為BroadcastJoin
歡迎加入E-MapReduce使用者群