一 正常性能調優
1 . 配置設定更多資源
--num-executors 3 \ 配置executor的數量
--driver-memory 100m \ 配置driver的記憶體(影響不大)
--executor-memory 100m \ 配置每個executor的記憶體大小
--executor-cores 3 \ 配置每個executor的cpu core數量
增加每個executor的記憶體量。增加了記憶體量以後,對性能的提升,有兩點:
1、如果需要對RDD進行cache,那麼更多的記憶體,就可以緩存更多的資料,将更少的資料寫入磁盤,甚至不寫入磁盤。減少了磁盤IO。
2、對于shuffle操作,reduce端,會需要記憶體來存放拉取的資料并進行聚合。如果記憶體不夠,也會寫入磁盤。如果給executor配置設定更多記憶體以後,就有更少的資料,需要寫入磁盤,甚至不需要寫入磁盤。減少了磁盤IO,提升了性能。
3、對于task的執行,可能會建立很多對象。如果記憶體比較小,可能會頻繁導緻JVM堆記憶體滿了,然後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。記憶體加大以後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。
2.調節并行度
很簡單的道理,隻要合理設定并行度,就可以完全充分利用你的叢集計算資源,并且減少每個task要處理的資料量,最終,就是提升你的整個Spark作業的性能和運作速度。
3.重構RDD架構以及RDD持久化
第一,RDD架構重構與優化 盡量去複用RDD,差不多的RDD,可以抽取稱為一個共同的RDD,供後面的RDD計算時,反複使用。
第二,公共RDD一定要實作持久化 北方吃餃子,現包現煮。你人來了,要點一盤餃子。餡料+餃子皮+水->包好的餃子,對包好的餃子去煮,煮開了以後,才有你需要的熟的,熱騰騰的餃子。 現實生活中,餃子現包現煮,當然是最好的了;但是Spark中,RDD要去“現包現煮”,那就是一場緻命的災難。 對于要多次計算和使用的公共RDD,一定要進行持久化。 持久化,也就是說,将RDD的資料緩存到記憶體中/磁盤中,(BlockManager),以後無論對這個RDD做多少次計算,那麼都是直接取這個RDD的持久化的資料,比如從記憶體中或者磁盤中,直接提取一份資料。
第三,持久化,是可以進行序列化的 如果正常将資料持久化在記憶體中,那麼可能會導緻記憶體的占用過大,這樣的話,也許,會導緻OOM記憶體溢出。 當純記憶體無法支撐公共RDD資料完全存放的時候,就優先考慮,使用序列化的方式在純記憶體中存儲。将RDD的每個partition的資料,序列化成一個大的位元組數組,就一個對象;序列化後,大大減少記憶體的空間占用。 序列化的方式,唯一的缺點就是,在擷取資料的時候,需要反序列化。 如果序列化純記憶體方式,還是導緻OOM,記憶體溢出;就隻能考慮磁盤的方式,記憶體+磁盤的普通方式(無序列化)。 記憶體+磁盤,序列化
第四,為了資料的高可靠性,而且記憶體充足,可以使用雙副本機制,進行持久化 持久化的雙副本機制,持久化後的一個副本,因為機器當機了,副本丢了,就還是得重新計算一次;持久化的每個資料單元,存儲一份副本,放在其他節點上面;進而進行容錯;一個副本丢了,不用重新計算,還可以使用另外一份副本。 這種方式,僅僅針對你的記憶體資源極度充足
4.廣播大變量
廣播變量,初始的時候,就在Drvier上有一份副本。 task在運作的時候,想要使用廣播變量中的資料,此時首先會在自己本地的Executor對應的BlockManager中,嘗試擷取變量副本;如果本地沒有,那麼就從Driver遠端拉取變量副本,并儲存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。 executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,舉例越近越好。
每個 Executor一個副本,不一定每個節點。
5.使用Kryo序列化
記憶體占用,網絡傳輸
1、算子函數中使用到的外部變量
2、持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
3、shuffle
1、算子函數中使用到的外部變量,使用Kryo以後:優化網絡傳輸的性能,可以優化叢集中記憶體的占用和消耗
2、持久化RDD,優化記憶體的占用和消耗;持久化RDD占用的記憶體越少,task執行的時候,建立的對象,就不至于頻繁的占滿記憶體,頻繁發生GC。
3、shuffle:可以優化網絡傳輸的性能
bbg
使用時,要自定義注冊類哦
6.使用 fastutil
7.資料本地化的等待時長
Spark在Driver上,對Application的每一個stage的task,進行配置設定之前,都會計算出每個task要計算的是哪個分片資料,RDD的某個partition;Spark的task配置設定算法,優先,會希望每個task正好配置設定到它要計算的資料所在的節點,這樣的話,就不用在網絡間傳輸資料; 但是呢,通常來說,有時,事與願違,可能task沒有機會配置設定到它的資料所在的節點,為什麼呢,可能那個節點的計算資源和計算能力都滿了;是以呢,這種時候,通常來說,Spark會等待一段時間,預設情況下是3s鐘(不是絕對的,還有很多種情況,對不同的本地化級别,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級别,比如說,将task配置設定到靠它要計算的資料所在節點,比較近的一個節點,然後進行計算。 但是對于第二種情況,通常來說,肯定是要發生資料傳輸,task會通過其所在節點的BlockManager來擷取資料,BlockManager發現自己本地沒有資料,會通過一個getRemote()方法,通過TransferService(網絡資料傳輸元件)從資料所在節點的BlockManager中,擷取資料,通過網絡傳輸回task所在節點。 對于我們來說,當然不希望是類似于第二種情況的了。最好的,當然是task和資料在一個節點上,直接從本地executor的BlockManager中擷取資料,純記憶體,或者帶一點磁盤IO;如果要通過網絡傳輸資料的話,那麼實在是,性能肯定會下降的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。
二.JVM調優
JVM調優的第一個點:降低cache操作的記憶體占比 spark中,堆記憶體又被劃分成了兩塊兒,一塊兒是專門用來給RDD的cache、persist操作進行RDD資料緩存用的;另外一塊兒,就是我們剛才所說的,用來給spark算子函數的運作使用的,存放函數中自己建立的對象。 預設情況下,給RDD cache操作的記憶體占比,是0.6,60%的記憶體都給了cache操作了。但是問題是,如果某些情況下,cache不是那麼的緊張,問題在于task算子函數中建立的對象過多,然後記憶體又不太大,導緻了頻繁的minor gc,甚至頻繁full gc,導緻spark頻繁的停止工作。性能影響會很大。 針對上述這種情況,大家可以在之前我們講過的那個spark ui。yarn去運作的話,那麼就通過yarn的界面,去檢視你的spark作業的運作統計,很簡單,大家一層一層點選進去就好。可以看到每個stage的運作情況,包括每個task的運作時間、gc時間等等。如果發現gc太頻繁,時間太長。此時就可以适當調價這個比例。 降低cache操作的記憶體占比,大不了用persist操作,選擇将一部分緩存的RDD資料寫入磁盤,或者序列化方式,配合Kryo序列化類,減少RDD緩存的記憶體占用;降低cache操作記憶體占比;對應的,算子函數的記憶體占比就提升了。這個時候,可能,就可以減少minor gc的頻率,同時減少full gc的頻率。對性能的提升是有一定的幫助的。
一句話,讓task執行算子函數時,有更多的記憶體可以使用。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
--conf spark.yarn.executor.memoryOverhead=2048 spark-submit腳本裡面,去用--conf的方式,去添加配置;一定要注意!!!切記,不是在你的spark作業代碼中,用new SparkConf().set()這種方式去設定,不要這樣去設定,是沒有用的!一定要在spark-submit腳本中去設定。 spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基于yarn的送出模式) 預設情況下,這個堆外記憶體上限大概是300多M;後來我們通常項目中,真正處理大資料的時候,這裡都會出現問題,導緻spark作業反複崩潰,無法運作;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G 通常這個參數調節上去以後,就會避免掉某些JVM OOM的異常問題,同時呢,會讓整體spark作業的性能,得到較大的提升。
http://blog.csdn.net/hammertank/article/details/48346285
此時呢,就會沒有響應,無法建立網絡連接配接;會卡住;ok,spark預設的網絡連接配接的逾時時長,是60s;如果卡住60s都無法建立連接配接的話,那麼就宣告失敗了。 碰到一種情況,偶爾,偶爾,偶爾!!!沒有規律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。 這種情況下,很有可能是有那份資料的executor在jvm gc。是以拉取資料的時候,建立不了連接配接。然後超過預設60s以後,直接宣告失敗。 報錯幾次,幾次都拉取不到資料的話,可能會導緻spark作業的崩潰。也可能會導緻DAGScheduler,反複送出幾次stage。TaskScheduler,反複送出幾次task。大大延長我們的spark作業的運作時間。
實際案例腳本:
/usr/local/spark/bin/spark-submit \
--class com.ibeifeng.sparkstudy.WordCount \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar \
三.Shuffle調優
new SparkConf().set("spark.shuffle.consolidateFiles", "true") 開啟shuffle map端輸出檔案合并的機制;預設情況下,是不開啟的,就是會發生如上所述的大量map端輸出檔案的操作,嚴重影響性能。
增大map端溢寫的記憶體緩沖空間,減少溢寫次數。 spark.shuffle.file.buffer ,32k--》 64k
增大reduce端聚合記憶體,減少讀寫次數 spark.shuffle.memoryFraction ,0.2--》0.3 嘗試性的增加
new SparkConf().set(" spark.shuffle.manager", " hash") :hash(預設)、sort(可以排序)、tungsten-sort鎢絲(1.5版本後才有,不穩定)
四.spark操作調優(算子調優)
1.MapPartitions替代map操作,不過看具體操作,因為Maprtition容易導緻OOM哦!
2.filter之後,資料容易傾斜,采用coalesce算子。主要就是用于在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量。減少partition的數量,而且讓每個partition的資料量都盡量均勻緊湊。 進而便于後面的task進行計算操作,在某種程度上,能夠一定程度的提升性能。
3.foreachPartition替代foreach,例如資料庫連接配接操作的時候,是非常好的。在實際生産環境,都是用這個,但資料量特别大,會有oom的可能。
4.repartition,SparkSQL的初始stage受限于hdfs的block數量限制。repartition算子,你用Spark SQL這一步的并行度和task數量,肯定是沒有辦法去改變了。但是呢,可以将你用Spark SQL查詢出來的RDD,使用repartition算子,去重新進行分區,此時可以分區成多個partition,比如從20個partition,分區成100個。
5.reduceBykey,map 端本地聚合。