天天看點

spark解決資料傾斜

一、使用hive ETL預處理資料

适用場景

導緻資料傾斜的是Hive表,hive表中的資料不均勻,業務場景需要頻繁使用spark對hive表執行操作

實作思路

先評估是否可以通過hive預處理

預處理(按照可以進行聚合,或者預先和其他表進行join)--> spark所針對資料源就是預處理之後表,是以spark不需要使用原來的shuffle算子執行操作了

方案實作原理

從根源上解決資料傾斜,避免了spark中shuffle類算子

治标不治本,因為資料本身存在不均勻問題,進行預處理隻是将資料傾斜提前到了hive中

方案優缺點

優點

實作起來簡單便捷,效果還非常好,完全規避掉了資料傾斜,Spark作業的性能會大幅度提升。

缺點

治标不治本,Hive ETL中還是會發生資料傾斜。

二、過濾少量導緻傾斜的key

導緻傾斜key就少數幾個,對計算本身影響不大

在spark sql中使用where過濾key或者spark core中對rdd執行filter過濾

如果需要每次作業執行時,動态判定哪些key的資料量最多然後再進行過濾,那麼可以使用sample算子對RDD進行采樣,然後計算出每個key的數量,取資料量最多的key過濾掉即可。

實作原理

将導緻資料傾斜的key給過濾掉之後,這些key就不會參與計算了,自然不可能産生資料傾斜。

優缺點

優點:實作簡單,而且效果也很好,可以完全規避掉資料傾斜。

缺點:适用場景不多,大多數情況下,導緻傾斜的key還是很多的,并不是隻有少數幾個。

三、提高shuffle操作并行度

如果我們必須要對資料傾斜迎難而上,那麼建議優先使用這種方案,因為這是處理資料傾斜最簡單的一種方案。

在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設定了這個shuffle算子執行時shuffle read task的數量,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的并行度,預設是200,對于很多場景來說都有點過小。

增加shuffle read task的數量,可以讓原本配置設定給一個task的多個key配置設定給多個task,進而讓每個task處理比原來更少的資料。

優點:實作起來比較簡單,可以有效緩解和減輕資料傾斜的影響。

缺點:隻是緩解了資料傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。

四、兩階段聚合

對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較适用這種方案。

局部聚合,先給每個key都打上一個随機數,接着對打上随機數後的資料,執行reduceByKey等聚合操作,進行局部聚合 --> 然後将key的字首去掉,進行全局聚合操作

将原本相同的key通過附加随機字首的方式,變成多個不同的key,就可以讓原本被一個task處理的資料分散到多個task上去做局部聚合,進而解決單個task處理資料量過多的問題。接着去除掉随機字首,再次進行全局聚合,就可以得到最終的結果

優點:對于聚合類的shuffle操作導緻的資料傾斜,效果是非常不錯的。通常都可以解決掉資料傾斜,或者至少是大幅度緩解資料傾斜,将Spark作業的性能提升數倍以上。

缺點:僅僅适用于聚合類的shuffle操作,适用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

多種方案組合使用

七、使用随機字首和擴容rdd進行join

如果在進行join操作時,RDD中有大量的key導緻資料傾斜,那麼進行分拆key也沒什麼意義,此時就隻能使用最後一種方案來解決問題了。

該方案的實作思路基本和“解決方案六”類似,首先檢視RDD/Hive表中的資料分布情況,找到那個造成資料傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條資料。

然後将該RDD的每條資料都打上一個n以内的随機字首。

同時對另外一個正常的RDD進行擴容,将每條資料都擴容成n條資料,擴容出來的每條資料都依次打上一個0~n的字首。

最後将兩個處理後的RDD進行join即可。

将原先一樣的key通過附加随機字首變成不一樣的key,然後就可以将這些處理後的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。

針對有大量傾斜key的情況,設法将部分key拆分出來進行單獨處理,是以隻能對整個RDD進行資料擴容,對記憶體資源要求很高。

優點:對join類型的資料傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯

缺點:該方案更多的是緩解資料傾斜,而不是徹底避免資料傾斜。而且需要對整個RDD進行擴容,對記憶體資源要求很高。

六、采樣傾斜key并拆分join操作

兩個RDD/Hive表進行join的時候,如果資料量都比較大,無法采用“解決方案五”,那麼此時可以看一下兩個RDD/Hive表中的key分布情況。

如果出現資料傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的資料量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那麼采用這個解決方案是比較合适的。

對包含少數幾個資料量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然後統計一下每個key的數量,計算出來資料量最大的是哪幾個key。

然後将這幾個key對應的資料從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key都打上n以内的随機數作為字首;

而不會導緻傾斜的大部分key形成另外一個RDD

接着将需要join的另一個RDD,也過濾出來那幾個傾斜key對應的資料并形成一個單獨的RDD,将每條資料膨脹成n條資料,這n條資料都按順序附加一個0~n的字首;

不會導緻傾斜的大部分key也形成另外一個RDD。

再将附加了随機字首的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以将原先相同的key打散成n份,分散到多個task中去進行join了。

而另外兩個普通的RDD就照常join即可。

最後将兩次join的結果使用union算子合并起來即可,就是最終的join結果。

對于join導緻的資料傾斜,如果隻是某幾個key導緻了傾斜,可以将少數幾個key分拆成獨立RDD,并附加随機字首打散成n份去進行join,此時這幾個key對應的資料就不會集中在少數幾個task上,而是分散到多個task進行join了。

優點:對于join導緻的資料傾斜,如果隻是某幾個key導緻了傾斜,采用該方式可以用最有效的方式打散key進行join。而且隻需要針對少數傾斜key對應的資料進行擴容n倍,不需要對全量資料進行擴容。避免了占用過多記憶體。

缺點:如果導緻傾斜的key特别多的話,比如成千上萬個key都導緻資料傾斜,那麼這種方式也不适合。

五、将reduce join轉換成map join

在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的資料量比較小(比如幾百M或者一兩G),比較适用此方案。

不使用join算子進行連接配接操作,而使用Broadcast變量與map類算子實作join操作,進而完全規避掉shuffle類的操作,徹底避免資料傾斜的發生和出現。将較小RDD中的資料直接通過collect算子拉取到Driver端的記憶體中來,然後對其建立一個Broadcast變量,廣播給其他Executor節點;

接着對另外一個RDD執行map類算子,在算子函數内,從Broadcast變量中擷取較小RDD的全量資料,與目前RDD的每一條資料按照連接配接key進行比對,如果連接配接key相同的話,那麼就将兩個RDD的資料用你需要的方式連接配接起來。

普通的join是會走shuffle過程的,而一旦shuffle,就相當于會将相同key的資料拉取到一個shuffle read task中再進行join,此時就是reduce join。

但是如果一個RDD是比較小的,則可以采用廣播小RDD全量資料+map算子來實作與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生資料傾斜。

優點:對join操作導緻的資料傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生資料傾斜。

缺點:适用場景較少,因為這個方案隻适用于一個大表和一個小表的情況。畢竟我們需要将小表進行廣播,此時會比較消耗記憶體資源,driver和每個Executor記憶體中都會駐留一份小RDD的全量資料。

繼續閱讀