一.org.apache.spark.shuffle.fetchfailedexception
1.問題描述
這種問題一般發生在有大量shuffle操作的時候,task不斷的failed,然後又重執行,一直循環下去,非常的耗時。
2.報錯提示
(1) missing output location
org.apache.spark.shuffle.metadatafetchfailedexception: missing an output location for shuffle 0
(2) shuffle fetch faild
org.apache.spark.shuffle.fetchfailedexception: failed to connect to spark047215/192.168.47.215:50268
目前的配置為每個executor使用1cpu,5gram,啟動了20個executor
3.解決方案
一般遇到這種問題提高executor記憶體即可,同時增加每個executor的cpu,這樣不會減少task并行度。
spark.executor.memory 15g
spark.executor.cores 3
spark.cores.max 21
啟動的execuote數量為:7個
execuotenum = spark.cores.max/spark.executor.cores
每個executor的配置:
3core,15g ram
消耗的記憶體資源為:105g ram
15g*7=105g
可以發現使用的資源并沒有提升,但是同樣的任務原來的配置跑幾個小時還在卡着,改了配置後幾分鐘就結束了。
二.executor&task lost
因為網絡或者gc的原因,worker或executor沒有接收到executor或task的心跳回報
(1) executor lost
warn tasksetmanager: lost task 1.0 in stage 0.0 (tid 1, aa.local): executorlostfailure (executor lost)
(2) task lost
warn tasksetmanager: lost task 69.2 in stage 7.0 (tid 1145, 192.168.47.217): java.io.ioexception: connection from /192.168.47.217:55483 closed
(3) 各種timeout
java.util.concurrent.timeoutexception: futures timed out after [120 second
error transportchannelhandler: connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. assuming connection is dead; please adjust spark.network.timeout if this is wrong
提高 spark.network.timeout 的值,根據情況改成300(5min)或更高。
預設為 120(120s),配置所有網絡傳輸的延時,如果沒有主動設定以下參數,預設覆寫其屬性
spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockmanagerslavetimeoutms
spark.shuffle.io.connectiontimeout
spark.rpc.asktimeout or spark.rpc.lookuptimeout
三.傾斜
大多數任務都完成了,還有那麼一兩個任務怎麼都跑不完或者跑的很慢。
分為資料傾斜和task傾斜兩種。
2.錯誤提示
(1) 資料傾斜
(2) 任務傾斜
差距不大的幾個task,有的運作速度特别慢。
資料傾斜大多數情況是由于大量null值或者""引起,在計算前過濾掉這些資料既可。
例如:
sqlcontext.sql("...where col is not null and col != ''")
task傾斜原因比較多,網絡io,cpu,mem都有可能造成這個節點上的任務執行緩慢,可以去看該節點的性能監控來分析原因。以前遇到過同僚在spark的一台worker上跑r的任務導緻該節點spark task運作緩慢。
或者可以開啟spark的推測機制,開啟推測機制後如果某一台機器的幾個task特别慢,推測機制會将任務配置設定到其他機器執行,最後spark會選取最快的作為最終結果。
spark.speculation true
spark.speculation.interval 100 - 檢測周期,機關毫秒;
spark.speculation.quantile 0.75 - 完成task的百分比時啟動推測
spark.speculation.multiplier 1.5 - 比其他的慢多少倍時啟動推測。
四.oom(記憶體溢出)
記憶體不夠,資料太多就會抛出oom的exeception
因為報錯提示很明顯,這裡就不給報錯提示了。。。
2.解決方案
主要有driver oom和executor oom兩種
(1) driver oom
一般是使用了collect操作将所有executor的資料聚合到driver導緻。盡量不要使用collect操作即可。
(2) executor oom
1.可以按下面的記憶體優化的方法增加code使用記憶體空間
2.增加executor記憶體總量,也就是說增加spark.executor.memory的值
3.增加任務并行度(大任務就被分成小任務了),參考下面優化并行度的方法
優化
1.記憶體
當然如果你的任務shuffle量特别大,同時rdd緩存比較少可以更改下面的參數進一步提高任務運作速度。
spark.storage.memoryfraction - 配置設定給rdd緩存的比例,預設為0.6(60%),如果緩存的資料較少可以降低該值。
spark.shuffle.memoryfraction - 配置設定給shuffle資料的記憶體比例,預設為0.2(20%)
剩下的20%記憶體空間則是配置設定給代碼生成對象等。
如果任務運作緩慢,jvm進行頻繁gc或者記憶體空間不足,或者可以降低上述的兩個值。
"spark.rdd.compress","true" - 預設為false,壓縮序列化的rdd分區,消耗一些cpu減少空間的使用
如果資料隻使用一次,不要采用cache操作,因為并不會提高運作速度,還會造成記憶體浪費。
2.并行度
spark.default.parallelism
發生shuffle時的并行度,在standalone模式下的數量預設為core的個數,也可手動調整,數量設定太大會造成很多小任務,增加啟動任務的開銷,太小,運作大資料量的任務時速度緩慢。
spark.sql.shuffle.partitions
sql聚合操作(發生shuffle)時的并行度,預設為200,如果任務運作緩慢增加這個值。
相同的兩個任務:
spark.sql.shuffle.partitions=300:
spark.sql.shuffle.partitions=500:
速度變快主要是大量的減少了gc的時間。
修改map階段并行度主要是在代碼中使用rdd.repartition(partitionnum)來操作。
本文作者:佚名
來源:51cto