天天看點

Spark調優 Spark Jobs 性能調優

調試資源配置設定

Spark 的使用者郵件郵件清單中經常會出現 “我有一個500個節點的叢集,為什麼但是我的應用一次隻有兩個 task 在執行”,鑒于 Spark 控制資源使用的參數的數量,這些問題不應該出現。但是在本章中,你将學會壓榨出你叢集的每一分資源。推薦的配置将根據不同的叢集管理系統( YARN、Mesos、Spark Standalone)而有所不同,我們将主要集中在 YARN 上,因為這個 Cloudera 推薦的方式。

我們先看一下在 YARN 上運作 Spark 的一些背景。檢視之前的博文:點選這裡檢視

Spark(以及YARN) 需要關心的兩項主要的資源是 CPU 和 記憶體, 磁盤 和 IO 當然也影響着 Spark 的性能,但是不管是 Spark 還是 Yarn 目前都沒法對他們做實時有效的管理。

在一個 Spark 應用中,每個 Spark executor 擁有固定個數的 core 以及固定大小的堆大小。core 的個數可以在執行 spark-submit 或者 pyspark 或者 spark-shell 時,通過參數 --executor-cores 指定,或者在 spark-defaults.conf 配置檔案或者 SparkConf 對象中設定 spark.executor.cores 參數。同樣地,堆的大小可以通過 --executor-memory 參數或者 spark.executor.memory 配置項。core 配置項控制一個 executor 中task的并發數。 --executor-cores 5 意味着每個 executor 中最多同時可以有5個 task 運作。memory 參數影響 Spark 可以緩存的資料的大小,也就是在 groupaggregate 以及 join 操作時 shuffle 的資料結構的最大值。

--num-executors 指令行參數或者spark.executor.instances 配置項控制需要的 executor 個數。從 CDH 5.4/Spark 1.3 開始,你可以避免使用這個參數,隻要你通過設定 spark.dynamicAllocation.enabled 參數打開 動态配置設定 。動态配置設定可以使的 Spark 的應用在有後續積壓的在等待的 task 時請求 executor,并且在空閑時釋放這些 executor。

同時 Spark 需求的資源如何跟 YARN 中可用的資源配合也是需要着重考慮的,YARN 相關的參數有:

  • yarn.nodemanager.resource.memory-mb 控制在每個節點上 container 能夠使用的最大記憶體;
  • yarn.nodemanager.resource.cpu-vcores 控制在每個節點上 container 能夠使用的最大core個數;

請求5個 core 會生成向 YARN 要5個虛拟core的請求。從 YARN 請求記憶體相對比較複雜因為以下的一些原因:

  • --executor-memory/spark.executor.memory 控制 executor 的堆的大小,但是 JVM 本身也會占用一定的堆空間,比如内部的 String 或者直接 byte buffer,executor memory 的 spark.yarn.executor.memoryOverhead 屬性決定向 YARN 請求的每個 executor 的記憶體大小,預設值為max(384, 0.7 * spark.executor.memory);
  • YARN 可能會比請求的記憶體高一點,YARN 的 yarn.scheduler.minimum-allocation-mb 和 yarn.scheduler.increment-allocation-mb 屬性控制請求的最小值和增加量。

下面展示的是 Spark on YARN 記憶體結構:

Spark調優 Spark Jobs 性能調優

如果這些還不夠決定Spark executor 個數,還有一些概念還需要考慮的:

  • 應用的master,是一個非 executor 的容器,它擁有特殊的從 YARN 請求資源的能力,它自己本身所占的資源也需要被計算在内。在 yarn-client 模式下,它預設請求 1024MB 和 1個core。在 yarn-cluster 模式中,應用的 master 運作 driver,是以使用參數 --driver-memory 和 --driver-cores 配置它的資源常常很有用。
  • 在 executor 執行的時候配置過大的 memory 經常會導緻過長的GC延時,64G是推薦的一個 executor 記憶體大小的上限。
  • 我們注意到 HDFS client 在大量并發線程是時性能問題。大概的估計是每個 executor 中最多5個并行的 task 就可以占滿的寫入帶寬。
  • 在運作微型 executor 時(比如隻有一個core而且隻有夠執行一個task的記憶體)扔掉在一個JVM上同時運作多個task的好處。比如 broadcast 變量需要為每個 executor 複制一遍,這麼多小executor會導緻更多的資料拷貝。

為了讓以上的這些更加具體一點,這裡有一個實際使用過的配置的例子,可以完全用滿整個叢集的資源。假設一個叢集有6個節點有NodeManager在上面運作,每個節點有16個core以及64GB的記憶體。那麼 NodeManager的容量:yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores 可以設為 63 * 1024 = 64512 (MB) 和 15。我們避免使用 100% 的 YARN container 資源因為還要為 OS 和 hadoop 的 Daemon 留一部分資源。在上面的場景中,我們預留了1個core和1G的記憶體給這些程序。Cloudera Manager 會自動計算并且配置。

是以看起來我們最先想到的配置會是這樣的:--num-executors 6 --executor-cores 15 --executor-memory 63G。但是這個配置可能無法達到我們的需求,因為: 

- 63GB+ 的 executor memory 塞不進隻有 63GB 容量的 NodeManager; 

- 應用的 master 也需要占用一個core,意味着在某個節點上,沒有15個core給 executor 使用; 

- 15個core會影響 HDFS IO的吞吐量。 

配置成 --num-executors 17 --executor-cores 5 --executor-memory 19G 可能會效果更好,因為: 

- 這個配置會在每個節點上生成3個 executor,除了應用的master運作的機器,這台機器上隻會運作2個 executor 

- --executor-memory 被分成3份(63G/每個節點3個executor)=21。 21 * (1 - 0.07) ~ 19。

調試并發

我們知道 Spark 是一套資料并行處理的引擎。但是 Spark 并不是神奇得能夠将所有計算并行化,它沒辦法從所有的并行化方案中找出最優的那個。每個 Spark stage 中包含若幹個 task,每個 task 串行地處理資料。在調試 Spark 的job時,task 的個數可能是決定程式性能的最重要的參數。

那麼這個數字是由什麼決定的呢?在之前的博文中介紹了 Spark 如何将 RDD 轉換成一組 stage。task 的個數與 stage中上一個 RDD 的 partition 個數相同。而一個 RDD 的 partition 個數與被它依賴的 RDD 的 partition 個數相同,除了以下的情況: coalesce transformation 可以建立一個具有更少 partition 個數的 RDD,union transformation 産出的RDD 的 partition 個數是它父 RDD 的 partition 個數之和, cartesian 傳回的 RDD 的 partition 個數是它們的積。

如果一個 RDD 沒有父 RDD 呢? 由 textFile 或者 hadoopFile 生成的 RDD 的 partition 個數由它們底層使用的 MapReduce InputFormat 決定的。一般情況下,每讀到的一個 HDFS block 會生成一個 partition。通過 parallelize接口生成的 RDD 的 partition 個數由使用者指定,如果使用者沒有指定則由參數 spark.default.parallelism 決定。

要想知道 partition 的個數,可以通過接口 rdd.partitions().size() 獲得。

這裡最需要關心的問題在于 task 的個數太小。如果運作時 task 的個數比實際可用的 slot 還少,那麼程式解沒法使用到所有的 CPU 資源。

過少的 task 個數可能會導緻在一些聚集操作時, 每個 task 的記憶體壓力會很大。任何 join,cogroup,*ByKey 操作都會在記憶體生成一個 hash-map或者 buffer 用于分組或者排序。join, cogroup ,groupByKey 會在 shuffle 時在 fetching 端使用這些資料結構, reduceByKey ,aggregateByKey 會在 shuffle 時在兩端都會使用這些資料結構。

當需要進行這個聚集操作的 record 不能完全輕易塞進記憶體中時,一些問題會暴露出來。首先,在記憶體 hold 大量這些資料結構的 record 會增加 GC的壓力,可能會導緻流程停頓下來。其次,如果資料不能完全載入記憶體,Spark 會将這些資料寫到磁盤,這會引起磁盤 IO和排序。在 Cloudera 的使用者中,這可能是導緻 Spark Job 慢的首要原因。

那麼如何增加你的 partition 的個數呢?如果你的問題 stage 是從 Hadoop 讀取資料,你可以做以下的選項: 

- 使用 repartition 選項,會引發 shuffle; 

- 配置 InputFormat 使用者将檔案分得更小; 

- 寫入 HDFS 檔案時使用更小的block。

如果問題 stage 從其他 stage 中獲得輸入,引發 stage 邊界的操作會接受一個 numPartitions 的參數,比如

  1. val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)

X 應該取什麼值?最直接的方法就是做實驗。不停的将 partition 的個數從上次實驗的 partition 個數乘以1.5,直到性能不再提升為止。

同時也有一些原則用于計算 X,但是也不是非常的有效是因為有些參數是很難計算的。這裡寫到不是因為它們很實用,而是可以幫助了解。這裡主要的目标是啟動足夠的 task 可以使得每個 task 接受的資料能夠都塞進它所配置設定到的記憶體中。

每個 task 可用的記憶體通過這個公式計算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 預設值分别 0.2 和 0.8.

在記憶體中所有 shuffle 資料的大小很難确定。最可行的是找出一個 stage 運作的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之間的比例。在用所有shuffle 寫乘以這個比例。但是如果這個 stage 是 reduce 時,可能會有點複雜: 

Spark調優 Spark Jobs 性能調優

在往上增加一點因為大多數情況下 partition 的個數會比較多。

試試在,在有所疑慮的時候,使用更多的 task 數(也就是 partition 數)都會效果更好,這與 MapRecuce 中建議 task數目選擇盡量保守的建議相反。這個因為 MapReduce 在啟動 task 時相比需要更大的代價。

繼續閱讀