本文僅涉及基礎代碼等設定調優,更深入的還請大家指出~~
1.RDD調配
對象 | 優化手段 |
RDD、DF、DS | 對于同一資料源隻建立一次,盡量多次使用 |
錯誤示範:
val newRDD1 = spark.read.csc("/helloworld")
val newRDD2 = spark.read.csc("/helloworld")
//以上的同一RDD建立兩次,資源浪費
2.持久化多次使用RDD
對象 | 優化手段 |
RDD、DF、DS | 多次調用的RDD持久化到記憶體 |
示範:
val demoDF = spark.read.csv("/helloWorld.csv")
demoDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
demoDF.map(...)
demoDF.join(...)
demoDF.reduce(...)
//當對DF進行action操作時,spark會将整個RDD重新查詢一遍,這樣便十分小号資源
//persist選擇将DF持久化到記憶體中,這樣每次讀取的DF就是直接從記憶體中拉取,
//MEMORY_AND_DISK_SER是選擇在記憶體不足在硬碟中持久化,SER是序列化将更節省記憶體
3.避免shuffle
對象 | 優化手段 |
RDD、DF、DS | 盡量避免能産生shuffle的算子,如:join、reducebykey... |
描述:
産生shuffle會将所有檔案中相同key的資料拉取到同一節點上,這樣不單會加重節點的
運算壓力,并且可能會造成資料傾斜,盡量使用map來代替
4.盡量使用高性能算子
對象 | 優化手段 |
RDD、DF、DS | 将group替換成reducebykey |
RDD、DF、DS | 使用xxxpartition這類算子,如foreachPartition代替foreach |
RDD、DF、DS | 使用filter之後,使用coalesce減少分區數,因為過濾後資料數量會減少,如果10條記錄使用3個分區,即3個節點計算反而會浪費資源 |
RDD、DF、DS | 廣播大變量,如果使用外部變量的時候,spark會将變量拷貝多份在網絡中傳輸 |
RDD、DF、DS | 将group替換成reducebykey |
5.使用kryo序列化
對象 | 優化手段 |
RDD、DF、DS | spark預設是java的序列化方式,如果用到序列化,将其設定為kryo可以提高10倍性能 |
描述:
spark.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
6.資源調優
對象 | 優化手段 |
num-executors | Driver在向Yarn請求資源的時候,會在每個節點啟動設定好的executor數量,如果沒有設定的話,隻會啟動少量的executor。每個spark任務一般設定為30~50個executor之間,太少太多都不好 |
executor-memory | 每個executor的運作記憶體量,num-executors乘以executor-memory不能超過隊列的總資源數,一般executor-memory設定在4g~8g之間,最好在資源的1/2以内,而不會影響機器上的其他計算 |
executor-cores | 一個executor一core隻能執行一個task,同樣num-executors乘以executor-cores不要超過1/2,一般設定為2~4 |
spark.default.parallelism | 該參數為spark設定stage的task數量,如果以上的資源設定充足,而此參數設定為1,則以上的資源根本沒有任務去執行,一般設定為num-executors * executor-cores的2~3倍 |
描述:
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000
#此送出參數可以用于參考
本文參考自美團技術,感謝閱讀~~