天天看點

Spark性能調優-Shuffle調優及故障排除篇

Spark調優之Shuffle調優

本節開始先講解Shuffle核心概念;然後針對HashShuffle、SortShuffle進行調優;接下來對map端、reduce端調優;再針對Spark中的資料傾斜問題進行剖析及調優;最後是Spark運作過程中的故障排除。
本文首發于公衆号【五分鐘學大資料】,本公号專注于大資料技術,分享高品質大資料原創技術文章。

一、Shuffle的核心概念

1. ShuffleMapStage與ResultStage

Spark性能調優-Shuffle調優及故障排除篇

ShuffleMapStage與ResultStage

在劃分stage時,最後一個stage稱為FinalStage,它本質上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。

ShuffleMapStage的結束伴随着shuffle檔案的寫磁盤。

ResultStage基本上對應代碼中的action算子,即将一個函數應用在RDD的各個partition的資料集上,意味着一個job的運作結束。

2. Shuffle中的任務個數

我們知道,Spark Shuffle分為map階段和reduce階段,或者稱之為ShuffleRead階段和ShuffleWrite階段,那麼對于一次Shuffle,map過程和reduce過程都會由若幹個task來執行,那麼map task和reduce task的數量是如何确定的呢?

假設Spark任務從HDFS中讀取資料,那麼初始RDD分區個數由該檔案的split個數決定,也就是一個split對應生成的RDD的一個partition,我們假設初始partition個數為N。

初始RDD經過一系列算子計算後(假設沒有執行repartition和coalesce算子進行重分區,則分區個數不變,仍為N,如果經過重分區算子,那麼分區個數變為M),我們假設分區個數不變,當執行到Shuffle操作時,map端的task個數和partition個數一緻,即map task為N個。

reduce端的stage預設取

spark.default.parallelism

這個配置項的值作為分區數,如果沒有配置,則以map端的最後一個RDD的分區數作為其分區數(也就是N),那麼分區數就決定了reduce端的task的個數。

3. reduce端資料的讀取

根據stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會先執行,那麼後執行的reduce task如何知道從哪裡去拉取map task落盤後的資料呢?

reduce端的資料拉取過程如下:

  1. map task 執行完畢後會将計算狀态以及磁盤小檔案位置等資訊封裝到MapStatus對象中,然後由本程序中的MapOutPutTrackerWorker對象将mapStatus對象發送給Driver程序的MapOutPutTrackerMaster對象;
  2. 在reduce task開始執行之前會先讓本程序中的MapOutputTrackerWorker向Driver程序中的MapoutPutTrakcerMaster發動請求,請求磁盤小檔案位置資訊;
  3. 當所有的Map task執行完畢後,Driver程序中的MapOutPutTrackerMaster就掌握了所有的磁盤小檔案的位置資訊。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小檔案的位置資訊;
  4. 完成之前的操作之後,由BlockTransforService去Executor0所在的節點拉資料,預設會啟動五個子線程。每次拉取的資料量不能超過48M(reduce task每次最多拉取48M資料,将拉來的資料存儲到Executor記憶體的20%記憶體中)。

二、HashShuffle解析

以下的讨論都假設每個Executor有1個cpu core。

1. 未經優化的HashShuffleManager

shuffle write階段,主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的算子(比如reduceByKey),而将每個task處理的資料按key進行“劃分”。所謂“劃分”,就是對相同的key執行hash算法,進而将相同key都寫入同一個磁盤檔案中,而每一個磁盤檔案都隻屬于下遊stage的一個task。在将資料寫入磁盤之前,會先将資料寫入記憶體緩沖中,當記憶體緩沖填滿之後,才會溢寫到磁盤檔案中去。

下一個stage的task有多少個,目前stage的每個task就要建立多少份磁盤檔案。比如下一個stage總共有100個task,那麼目前stage的每個task都要建立100份磁盤檔案。如果目前stage有50個task,總共有10個Executor,每個Executor執行5個task,那麼每個Executor上總共就要建立500個磁盤檔案,所有Executor上會建立5000個磁盤檔案。由此可見,未經優化的shuffle write操作所産生的磁盤檔案的數量是極其驚人的。

shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要将上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接配接等操作。由于shuffle write的過程中,map task給下遊stage的每個reduce task都建立了一個磁盤檔案,是以shuffle read的過程中,每個reduce task隻要從上遊stage的所有map task所在節點上,拉取屬于自己的那一個磁盤檔案即可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都隻能拉取與buffer緩沖相同大小的資料,然後通過記憶體中的一個Map進行聚合等操作。聚合完一批資料後,再拉取下一批資料,并放到buffer緩沖中進行聚合操作。以此類推,直到最後将所有資料到拉取完,并得到最終的結果。

未優化的HashShuffleManager工作原理如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

未優化的HashShuffleManager工作原理

2. 優化後的HashShuffleManager

為了優化HashShuffleManager我們可以設定一個參數:

spark.shuffle.consolidateFiles

,該參數預設值為false,将其設定為true即可開啟優化機制,通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。

開啟consolidate機制之後,在shuffle write過程中,task就不是為下遊stage的每個task建立一個磁盤檔案了,此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤檔案,磁盤檔案的數量與下遊stage的task數量是相同的。一個Executor上有多少個cpu core,就可以并行執行多少個task。而第一批并行執行的每個task都會建立一個shuffleFileGroup,并将資料寫入對應的磁盤檔案内。

當Executor的cpu core執行完一批task,接着執行下一批task時,下一批task就會複用之前已有的shuffleFileGroup,包括其中的磁盤檔案,也就是說,此時task會将資料寫入已有的磁盤檔案中,而不會寫入新的磁盤檔案中。是以,consolidate機制允許不同的task複用同一批磁盤檔案,這樣就可以有效将多個task的磁盤檔案進行一定程度上的合并,進而大幅度減少磁盤檔案的數量,進而提升shuffle write的性能。

假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數為1),每個Executor執行5個task。那麼原本使用未經優化的HashShuffleManager時,每個Executor會産生500個磁盤檔案,所有Executor會産生5000個磁盤檔案的。但是此時經過優化之後,每個Executor建立的磁盤檔案的數量的計算公式為:

cpu core的數量 * 下一個stage的task數量

,也就是說,每個Executor此時隻會建立100個磁盤檔案,所有Executor隻會建立1000個磁盤檔案。

優化後的HashShuffleManager工作原理如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

優化後的HashShuffleManager工作原理

三、 SortShuffle解析

SortShuffleManager的運作機制主要分成兩種,一種是普通運作機制,另一種是bypass運作機制。當shuffle read task的數量小于等于

spark.shuffle.sort.bypassMergeThreshold

參數的值時(預設為200),就會啟用bypass機制。

1. 普通運作機制

在該模式下,資料會先寫入一個記憶體資料結構中,此時根據不同的shuffle算子,可能選用不同的資料結構。如果是reduceByKey這種聚合類的shuffle算子,那麼會選用Map資料結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle算子,那麼會選用Array資料結構,直接寫入記憶體。接着,每寫一條資料進入記憶體資料結構之後,就會判斷一下,是否達到了某個臨界門檻值。如果達到臨界門檻值的話,那麼就會嘗試将記憶體資料結構中的資料溢寫到磁盤,然後清空記憶體資料結構。

在溢寫到磁盤檔案之前,會先根據key對記憶體資料結構中已有的資料進行排序。排序過後,會分批将資料寫入磁盤檔案。預設的batch數量是10000條,也就是說,排序好的資料,會以每批1萬條資料的形式分批寫入磁盤檔案。寫入磁盤檔案是通過Java的BufferedOutputStream實作的。BufferedOutputStream是Java的緩沖輸出流,首先會将資料緩沖在記憶體中,當記憶體緩沖滿溢之後再一次寫入磁盤檔案中,這樣可以減少磁盤IO次數,提升性能。

一個task将所有資料寫入記憶體資料結構的過程中,會發生多次磁盤溢寫操作,也就會産生多個臨時檔案。最後會将之前所有的臨時磁盤檔案都進行合并,這就是merge過程,此時會将之前所有臨時磁盤檔案中的資料讀取出來,然後依次寫入最終的磁盤檔案之中。此外,由于一個task就隻對應一個磁盤檔案,也就意味着該task為下遊stage的task準備的資料都在這一個檔案中,是以還會單獨寫一份索引檔案,其中辨別了下遊各個task的資料在檔案中的start offset與end offset。

SortShuffleManager由于有一個磁盤檔案merge的過程,是以大大減少了檔案數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由于每個task最終隻有一個磁盤檔案,是以此時每個Executor上隻有5個磁盤檔案,所有Executor隻有50個磁盤檔案。

普通運作機制的SortShuffleManager工作原理如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

普通運作機制的SortShuffleManager工作原理

2. bypass運作機制

bypass運作機制的觸發條件如下:

  • shuffle map task數量小于

    spark.shuffle.sort.bypassMergeThreshold=200

    參數的值。
  • 不是聚合類的shuffle算子。

此時,每個task會為每個下遊task都建立一個臨時磁盤檔案,并将資料按key進行hash然後根據key的hash值,将key寫入對應的磁盤檔案之中。當然,寫入磁盤檔案時也是先寫入記憶體緩沖,緩沖寫滿之後再溢寫到磁盤檔案的。最後,同樣會将所有臨時磁盤檔案都合并成一個磁盤檔案,并建立一個單獨的索引檔案。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁盤檔案,隻是在最後會做一個磁盤檔案的合并而已。是以少量的最終磁盤檔案,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運作機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的性能開銷。

bypass運作機制的SortShuffleManager工作原理如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

bypass運作機制的SortShuffleManager工作原理

四、map和reduce端緩沖區大小

在Spark任務運作過程中,如果shuffle的map端處理的資料量比較大,但是map端緩沖的大小是固定的,可能會出現map端緩沖資料頻繁spill溢寫到磁盤檔案中的情況,使得性能非常低下,通過調節map端緩沖的大小,可以避免頻繁的磁盤IO操作,進而提升Spark任務的整體性能。

map端緩沖的預設配置是32KB,如果每個task處理640KB的資料,那麼會發生640/32 = 20次溢寫,如果每個task處理64000KB的資料,即會發生64000/32=2000次溢寫,這對于性能的影響是非常嚴重的。

map端緩沖的配置方法:

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer", "64")
           

Spark Shuffle過程中,shuffle reduce task的buffer緩沖區大小決定了reduce task每次能夠緩沖的資料量,也就是每次能夠拉取的資料量,如果記憶體資源較為充足,适當增加拉取資料緩沖區的大小,可以減少拉取資料的次數,也就可以減少網絡傳輸的次數,進而提升性能。

reduce端資料拉取緩沖區的大小可以通過

spark.reducer.maxSizeInFlight

參數進行設定,預設為48MB。該參數的設定方法如下:

reduce端資料拉取緩沖區配置:

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "96")
           

五、reduce端重試次數和等待時間間隔

Spark Shuffle過程中,reduce task拉取屬于自己的資料時,如果因為網絡異常等原因導緻失敗會自動進行重試。對于那些包含了特别耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由于JVM的full gc或者網絡不穩定等因素導緻的資料拉取失敗。在實踐中發現,對于針對超大資料量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

reduce端拉取資料重試次數可以通過

spark.shuffle.io.maxRetries

參數進行設定,該參數就代表了可以重試的最大次數。如果在指定次數之内拉取還是沒有成功,就可能會導緻作業執行失敗,預設為3,該參數的設定方法如下:

reduce端拉取資料重試次數配置:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")
           

Spark Shuffle過程中,reduce task拉取屬于自己的資料時,如果因為網絡異常等原因導緻失敗會自動進行重試,在一次失敗後,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。

reduce端拉取資料等待間隔可以通過

spark.shuffle.io.retryWait

參數進行設定,預設值為5s,該參數的設定方法如下:

reduce端拉取資料等待間隔配置:

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait", "60s")
           

六、bypass機制開啟門檻值

對于SortShuffleManager,如果shuffle reduce task的數量小于某一門檻值則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫資料,但是最後會将每個task産生的所有臨時磁盤檔案都合并成一個檔案,并會建立單獨的索引檔案。

當你使用SortShuffleManager時,如果的确不需要排序操作,那麼建議将這個參數調大一些,大于shuffle read task的數量,那麼此時map-side就不會進行排序了,減少了排序的性能開銷,但是這種方式下,依然會産生大量的磁盤檔案,是以shuffle write性能有待提高。

SortShuffleManager排序操作門檻值的設定可以通過

spark.shuffle.sort.bypassMergeThreshold

這一參數進行設定,預設值為200,該參數的設定方法如下:

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold", "400")
           

資料傾斜

就是資料分到各個區的數量不太均勻,可以自定義分區器,想怎麼分就怎麼分。

Spark中的資料傾斜問題主要指shuffle過程中出現的資料傾斜問題,是由于不同的key對應的資料量不同導緻的不同task所處理的資料量不同的問題。

例如,reduced端一共要處理100萬條資料,第一個和第二個task分别被配置設定到了1萬條資料,計算5分鐘内完成,第三個task配置設定到了98萬資料,此時第三個task可能需要10個小時完成,這使得整個Spark作業需要10個小時才能運作完成,這就是資料傾斜所帶來的後果。

注意,要區分開資料傾斜與資料過量這兩種情況,資料傾斜是指少數task被配置設定了絕大多數的資料,是以少數task運作緩慢;資料過量是指所有task被配置設定的資料量都很大,相差不多,所有task都運作緩慢。

資料傾斜的表現:

  1. Spark作業的大部分task都執行迅速,隻有有限的幾個task執行的非常慢,此時可能出現了資料傾斜,作業可以運作,但是運作得非常慢;
  2. Spark作業的大部分task都執行迅速,但是有的task在運作過程中會突然報出OOM,反複執行幾次都在某一個task報出OOM錯誤,此時可能出現了資料傾斜,作業無法正常運作。

    定位資料傾斜問題:

  3. 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據代碼邏輯判斷此處是否會出現資料傾斜;
  4. 檢視Spark作業的log檔案,log檔案對于錯誤的記錄會精确到代碼的某一行,可以根據異常定位到的代碼位置來明确錯誤發生在第幾個stage,對應的shuffle算子是哪一個;

1. 預聚合原始資料

1. 避免shuffle過程

絕大多數情況下,Spark作業的資料來源都是Hive表,這些Hive表基本都是經過ETL之後的昨天的資料。

為了避免資料傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那麼從根本上就消除了發生資料傾斜問題的可能。

如果Spark作業的資料來源于Hive表,那麼可以先在Hive表中對資料進行聚合,例如按照key進行分組,将同一key對應的所有value用一種特殊的格式拼接到一個字元串裡去,這樣,一個key就隻有一條資料了;之後,對一個key的所有value進行處理時,隻需要進行map操作即可,無需再進行任何的shuffle操作。通過上述方式就避免了執行shuffle操作,也就不可能會發生任何的資料傾斜問題。

對于Hive表中資料的操作,不一定是拼接成一個字元串,也可以是直接對key的每一條資料進行累計計算。要區分開,處理的資料量大和資料傾斜的差別。

2. 增大key粒度(減小資料傾斜可能性,增大每個task的資料量)

如果沒有辦法對每個key聚合出來一條資料,在特定場景下,可以考慮擴大key的聚合粒度。

例如,目前有10萬條使用者資料,目前key的粒度是(省,城市,區,日期),現在我們考慮擴大粒度,将key的粒度擴大為(省,城市,日期),這樣的話,key的數量會減少,key之間的資料量差異也有可能會減少,由此可以減輕資料傾斜的現象和問題。(此方法隻針對特定類型的資料有效,當應用場景不适宜時,會加重資料傾斜)

2. 預處理導緻傾斜的key

1. 過濾

如果在Spark作業中允許丢棄某些資料,那麼可以考慮将可能導緻資料傾斜的key進行過濾,濾除可能導緻資料傾斜的key對應的資料,這樣,在Spark作業中就不會發生資料傾斜了。

2. 使用随機key

當使用了類似于groupByKey、reduceByKey這樣的算子時,可以考慮使用随機key實作雙重聚合,如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

随機key實作雙重聚合

首先,通過map算子給每個資料的key添加随機數字首,對key進行打散,将原先一樣的key變成不一樣的key,然後進行第一次聚合,這樣就可以讓原本被一個task處理的資料分散到多個task上去做局部聚合;随後,去除掉每個key的字首,再次進行聚合。

此方法對于由groupByKey、reduceByKey這類算子造成的資料傾斜有比較好的效果,僅僅适用于聚合類的shuffle操作,适用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。

3. sample采樣對傾斜key單獨進行join

在Spark中,如果某個RDD隻有一個key,那麼在shuffle過程中會預設将此key對應的資料打散,由不同的reduce端task進行處理。

是以當由單個key導緻資料傾斜時,可有将發生資料傾斜的key單獨提取出來,組成一個RDD,然後用這個原本會導緻傾斜的key組成的RDD和其他RDD單獨join,此時,根據Spark的運作機制,此RDD中的資料會在shuffle階段被分散到多個task中去進行join操作。

傾斜key單獨join的流程如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

傾斜key單獨join流程

适用場景分析:

對于RDD中的資料,可以将其轉換為一個中間表,或者是直接使用countByKey()的方式,看一下這個RDD中各個key對應的資料量,此時如果你發現整個RDD就一個key的資料量特别多,那麼就可以考慮使用這種方法。

當資料量非常大時,可以考慮使用sample采樣擷取10%的資料,然後分析這10%的資料中哪個key可能會導緻資料傾斜,然後将這個key對應的資料單獨提取出來。

不适用場景分析:

如果一個RDD中導緻資料傾斜的key很多,那麼此方案不适用。

3. 提高reduce并行度

當方案一和方案二對于資料傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的數量,那麼每個task配置設定到的資料量就會相應減少,由此緩解資料傾斜問題。

1. reduce端并行度的設定

在大部分的shuffle算子中,都可以傳入一個并行度的設定參數,比如reduceByKey(500),這個參數會決定shuffle過程中reduce端的并行度,在進行shuffle操作的時候,就會對應着建立指定數量的reduce task。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設定一個參數,即

spark.sql.shuffle.partitions

,該參數代表了shuffle read task的并行度,該值預設是200,對于很多場景來說都有點過小。

增加shuffle read task的數量,可以讓原本配置設定給一個task的多個key配置設定給多個task,進而讓每個task處理比原來更少的資料。

舉例來說,如果原本有5個key,每個key對應10條資料,這5個key都是配置設定給一個task的,那麼這個task就要處理50條資料。而增加了shuffle read task以後,每個task就配置設定到一個key,即每個task就處理10條資料,那麼自然每個task的執行時間都會變短了。

2. reduce端并行度設定存在的缺陷

提高reduce端并行度并沒有從根本上改變資料傾斜的本質和問題(方案一和方案二從根本上避免了資料傾斜的發生),隻是盡可能地去緩解和減輕shuffle reduce task的資料壓力,以及資料傾斜的問題,适用于有較多key對應的資料量都比較大的情況。

該方案通常無法徹底解決資料傾斜,因為如果出現一些極端情況,比如某個key對應的資料量有100萬,那麼無論你的task數量增加到多少,這個對應着100萬資料的key肯定還是會配置設定到一個task中去處理,是以注定還是會發生資料傾斜的。是以這種方案隻能說是在發現資料傾斜時嘗試使用的一種手段,嘗試去用最簡單的方法緩解資料傾斜而已,或者是和其他方案結合起來使用。

在理想情況下,reduce端并行度提升後,會在一定程度上減輕資料傾斜的問題,甚至基本消除資料傾斜;但是,在一些情況下,隻會讓原來由于資料傾斜而運作緩慢的task運作速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運作緩慢,此時,要及時放棄方案三,開始嘗試後面的方案。

4. 使用map join

正常情況下,join操作都會執行shuffle過程,并且執行的是reduce join,也就是先将所有相同的key和對應的value彙聚到一個reduce task中,然後再進行join。普通join的過程如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

普通join過程

普通的join是會走shuffle過程的,而一旦shuffle,就相當于會将相同key的資料拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量資料+map算子來實作與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生資料傾斜。

注意:RDD是并不能直接進行廣播的,隻能将RDD内部的資料通過collect拉取到Driver記憶體然後再進行廣播。

1. 核心思路:

不使用join算子進行連接配接操作,而使用broadcast變量與map類算子實作join操作,進而完全規避掉shuffle類的操作,徹底避免資料傾斜的發生和出現。将較小RDD中的資料直接通過collect算子拉取到Driver端的記憶體中來,然後對其建立一個broadcast變量;接着對另外一個RDD執行map類算子,在算子函數内,從broadcast變量中擷取較小RDD的全量資料,與目前RDD的每一條資料按照連接配接key進行比對,如果連接配接key相同的話,那麼就将兩個RDD的資料用你需要的方式連接配接起來。

根據上述思路,根本不會發生shuffle操作,從根本上杜絕了join操作可能導緻的資料傾斜問題。

當join操作有資料傾斜問題并且其中一個RDD的資料量較小時,可以優先考慮這種方式,效果非常好。

map join的過程如下圖所示:

Spark性能調優-Shuffle調優及故障排除篇

map join過程

2. 不适用場景分析:

由于Spark的廣播變量是在每個Executor中儲存一個副本,如果兩個RDD資料量都比較大,那麼如果将一個資料量比較大的RDD做成廣播變量,那麼很有可能會造成記憶體溢出。

故障排除

1. 避免OOM-out of memory

在Shuffle過程,reduce端task并不是等到map端task将其資料全部寫入磁盤後再去拉取,而是map端寫一點資料,reduce端task就會拉取一小部分資料,然後立即進行後面的聚合、算子函數的使用等操作。

reduce端task能夠拉取多少資料,由reduce拉取資料的緩沖區buffer來決定,因為拉取過來的資料都是先放在buffer中,然後再進行後續的處理,buffer的預設大小為48MB。

reduce端task會一邊拉取一邊計算,不一定每次都會拉滿48MB的資料,可能大多數時候拉取一部分資料就處理掉了。

雖然說增大reduce端緩沖區大小可以減少拉取次數,提升Shuffle性能,但是有時map端的資料量非常大,寫出的速度非常快,此時reduce端的所有task在拉取的時候,有可能全部達到自己緩沖的最大極限值,即48MB,此時,再加上reduce端執行的聚合函數的代碼,可能會建立大量的對象,這可能會導緻記憶體溢出,即OOM。

如果一旦出現reduce端記憶體溢出的問題,我們可以考慮減小reduce端拉取資料緩沖區的大小,例如減少為12MB。

在實際生産環境中是出現過這種問題的,這是典型的以性能換執行的原理。reduce端拉取資料的緩沖區減小,不容易導緻OOM,但是相應的,reudce端的拉取次數增加,造成更多的網絡傳輸開銷,造成性能的下降。

注意,要保證任務能夠運作,再考慮性能的優化。

2. 避免GC導緻的shuffle檔案拉取失敗

在Spark作業中,有時會出現

shuffle file not found

的錯誤,這是非常常見的一個報錯,有時出現這種錯誤以後,選擇重新執行一遍,就不再報出這種錯誤。

出現上述問題可能的原因是Shuffle操作中,後面stage的task想要去上一個stage的task所在的Executor拉取資料,結果對方正在執行GC,執行GC會導緻Executor内所有的工作現場全部停止,比如BlockManager、基于netty的網絡通信等,這就會導緻後面的task拉取資料拉取了半天都沒有拉取到,就會報出

shuffle file not found

的錯誤,而第二次再次執行就不會再出現這種錯誤。

可以通過調整reduce端拉取資料重試次數和reduce端拉取資料時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取資料的重試次數增加,并且每次失敗後等待的時間間隔加長。

JVM GC導緻的shuffle檔案拉取失敗調整資料重試次數和reduce端拉取資料時間間隔:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")
  .set("spark.shuffle.io.retryWait", "60s")
           

3. YARN-CLIENT模式導緻的網卡流量激增問題

在YARN-client模式下,Driver啟動在本地機器上,而Driver負責所有的任務排程,需要與YARN叢集上的多個Executor進行頻繁的通信。

假設有100個Executor,1000個task,那麼每個Executor配置設定到10個task,之後,Driver要頻繁地跟Executor上運作的1000個task進行通信,通信資料非常多,并且通信品類特别高。這就導緻有可能在Spark任務運作過程中,由于頻繁大量的網絡通訊,本地機器的網卡流量會激增。

注意,YARN-client模式隻會在測試環境中使用,而之是以使用YARN-client模式,是由于可以看到詳細全面的log資訊,通過檢視log,可以鎖定程式中存在的問題,避免在生産環境下發生故障。

在生産環境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不會造成本地機器網卡流量激增問題,如果YARN-cluster模式下存在網絡通信的問題,需要運維團隊進行解決。

4. YARN-CLUSTER模式的JVM棧記憶體溢出無法執行問題

當Spark作業中包含SparkSQL的内容時,可能會碰到YARN-client模式下可以運作,但是YARN-cluster模式下無法送出運作(報出OOM錯誤)的情況。

YARN-client模式下,Driver是運作在本地機器上的,Spark使用的JVM的PermGen的配置,是本地機器上的spark-class檔案,JVM永久代的大小是128MB,這個是沒有問題的,但是在YARN-cluster模式下,Driver運作在YARN叢集的某個節點上,使用的是沒有經過配置的預設設定,PermGen永久代大小為82MB。

SparkSQL的内部要進行很複雜的SQL的語義解析、文法樹轉換等等,非常複雜,如果sql語句本身就非常複雜,那麼很有可能會導緻性能的損耗和記憶體的占用,特别是對PermGen的占用會比較大。

是以,此時如果PermGen占用好過了82MB,但是又小于128MB,就會出現YARN-client模式下可以運作,YARN-cluster模式下無法運作的情況。

解決上述問題的方法是增加PermGen(永久代)的容量,需要在spark-submit腳本中對相關參數進行設定,設定方法如下:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
           

通過上述方法就設定了Driver永久代的大小,預設為128MB,最大256MB,這樣就可以避免上面所說的問題。

5. 避免SparkSQL JVM棧記憶體溢出

當SparkSQL的sql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧記憶體溢出。

JVM棧記憶體溢出基本上就是由于調用的方法層級過多,産生了大量的,非常深的,超出了JVM棧深度限制的遞歸。(我們猜測SparkSQL有大量or語句的時候,在解析SQL時,例如轉換為文法樹或者進行執行計劃的生成的時候,對于or的處理是遞歸,or非常多時,會發生大量的遞歸)

此時,建議将一條sql語句拆分為多條sql語句來執行,每條sql語句盡量保證100個以内的子句。根據實際的生産環境試驗,一條sql語句的or關鍵字控制在100個以内,通常不會導緻JVM棧記憶體溢出。

更多大資料好文,歡迎關注公衆号【五分鐘學大資料】

繼續閱讀