天天看點

Spark面試題——Spark小檔案問題及解決方案

1、相關問題描述

當我們使用spark sql執行etl時候出現了,可能最終結果大小隻有幾百k,但是小檔案一個分區有上千的情況。

這樣就會導緻以下的一些危害:

  • hdfs有最大檔案數限制;
  • 浪費磁盤資源(可能存在空檔案);
  • hive中進行統計,計算的時候,會産生很多個map,影響計算的速度。

2、解決方案

1) 方法一:通過spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) // true表示是否shuffle 
val rdd3 = rdd1.repartition(8)      

coalesce:coalesce()方法的作用是傳回指定一個新的指定分區的Rdd,如果是生成一個窄依賴的結果,那麼可以不發生shuffle,分區的數量發生激烈的變化,計算節點不足,不設定true可能會出錯。

repartition:coalesce()方法shuffle為true的情況。

2)方法二:降低spark并行度,即調節spark.sql.shuffle.partitions

比如之前設定的為100,按理說應該生成的檔案數為100;但是由于業務比較特殊,采用的大量的union all,且union all在spark中屬于窄依賴,不會進行shuffle,是以導緻最終會生成(union all數量+1)*100的檔案數。如有10個union all,會生成1100個小檔案。這樣導緻降低并行度為10之後,執行時長大大增加,且檔案數依舊有110個,效果有,但是不理想。

3)方法三:新增一個并行度=1任務,專門合并小檔案。

先将原來的任務資料寫到一個臨時分區(如tmp);再起一個并行度為1的任務,類似:

insert overwrite 目标表 select * from 臨時分區      

但是結果小檔案數還是沒有減少,原因:‘select * from 臨時分區’ 這個任務在spark中屬于窄依賴;并且spark DAG中分為寬依賴和窄依賴,隻有寬依賴會進行shuffle;故并行度shuffle,spark.sql.shuffle.partitions=1也就沒有起到作用;由于資料量本身不是特别大,是以可以直接采用group by(在spark中屬于寬依賴)的方式,類似:

insert overwrite 目标表 select * from 臨時分區 group by *      

先運作原任務,寫到tmp分區,‘dfs -count’檢視檔案數,1100個,運作加上group by的臨時任務(spark.sql.shuffle.partitions=1),檢視結果目錄,檔案數=1,成功。

最後又加了個删除tmp分區的任務。

3、總結

1)友善的話,可以采用coalesce()方法和repartition()方法。

2)如果任務邏輯簡單,資料量少,可以直接降低并行度。

3)任務邏輯複雜,資料量很大,原任務大并行度計算寫到臨時分區,再加兩個任務:一個用來将臨時分區的檔案用小并行度(加寬依賴)合并成少量檔案到實際分區;另一個删除臨時分區。

4)hive任務減少小檔案相對比較簡單,可以直接設定參數,如:

sethive.merge.mapfiles = true      
sethive.merge.mapredfiles= true      
sethive.merge.smallfiles.avgsize=1024000000      

繼續閱讀