1、相關問題描述
當我們使用spark sql執行etl時候出現了,可能最終結果大小隻有幾百k,但是小檔案一個分區有上千的情況。
這樣就會導緻以下的一些危害:
- hdfs有最大檔案數限制;
- 浪費磁盤資源(可能存在空檔案);
- hive中進行統計,計算的時候,會産生很多個map,影響計算的速度。
2、解決方案
1) 方法一:通過spark的coalesce()方法和repartition()方法
val rdd2 = rdd1.coalesce(8, true) // true表示是否shuffle
val rdd3 = rdd1.repartition(8)
coalesce:coalesce()方法的作用是傳回指定一個新的指定分區的Rdd,如果是生成一個窄依賴的結果,那麼可以不發生shuffle,分區的數量發生激烈的變化,計算節點不足,不設定true可能會出錯。
repartition:coalesce()方法shuffle為true的情況。
2)方法二:降低spark并行度,即調節spark.sql.shuffle.partitions
比如之前設定的為100,按理說應該生成的檔案數為100;但是由于業務比較特殊,采用的大量的union all,且union all在spark中屬于窄依賴,不會進行shuffle,是以導緻最終會生成(union all數量+1)*100的檔案數。如有10個union all,會生成1100個小檔案。這樣導緻降低并行度為10之後,執行時長大大增加,且檔案數依舊有110個,效果有,但是不理想。
3)方法三:新增一個并行度=1任務,專門合并小檔案。
先将原來的任務資料寫到一個臨時分區(如tmp);再起一個并行度為1的任務,類似:
insert overwrite 目标表 select * from 臨時分區
但是結果小檔案數還是沒有減少,原因:‘select * from 臨時分區’ 這個任務在spark中屬于窄依賴;并且spark DAG中分為寬依賴和窄依賴,隻有寬依賴會進行shuffle;故并行度shuffle,spark.sql.shuffle.partitions=1也就沒有起到作用;由于資料量本身不是特别大,是以可以直接采用group by(在spark中屬于寬依賴)的方式,類似:
insert overwrite 目标表 select * from 臨時分區 group by *
先運作原任務,寫到tmp分區,‘dfs -count’檢視檔案數,1100個,運作加上group by的臨時任務(spark.sql.shuffle.partitions=1),檢視結果目錄,檔案數=1,成功。
最後又加了個删除tmp分區的任務。
3、總結
1)友善的話,可以采用coalesce()方法和repartition()方法。
2)如果任務邏輯簡單,資料量少,可以直接降低并行度。
3)任務邏輯複雜,資料量很大,原任務大并行度計算寫到臨時分區,再加兩個任務:一個用來将臨時分區的檔案用小并行度(加寬依賴)合并成少量檔案到實際分區;另一個删除臨時分區。
4)hive任務減少小檔案相對比較簡單,可以直接設定參數,如:
sethive.merge.mapfiles = true
sethive.merge.mapredfiles= true
sethive.merge.smallfiles.avgsize=1024000000