天天看點

重要 | Spark分區并行度決定機制

最近經常有小夥伴留言,核心問題都比較類似,就是雖然接觸Spark有一段時間了,但是搞不明白一個問題,為什麼我從HDFS上加載不同的檔案時,列印的分區數不一樣,并且好像spark.default.parallelism這個參數時不是一直起作用?其實筆者之前的文章已有相關介紹,想知道為什麼,就必須了解Spark在加載不同的資料源時分區決定機制以及調用不用算子時并行度決定機制以及分區劃分。

其實之前的文章​​《Spark的分區》​​、​​《通過spark.default.parallelism談Spark并行度》​​已有所介紹,筆者今天再做一次詳細的補充,建議大家在對Spark有一定了解的基礎上,三篇文章結合一起看。

大家都知道Spark job中最小執行機關為task,合理設定Spark job每個stage的task數是決定性能好壞的重要因素之一,但是Spark自己确定最佳并行度的能力有限,這就要求我們在了解其中内在機制的前提下,去各種測試、計算等來最終确定最佳參數配比。

Spark任務在執行時會将RDD劃分為不同的stage,一個stage中task的數量跟最後一個RDD的分區數量相同。之前已經介紹過,stage劃分的關鍵是寬依賴,而寬依賴往往伴随着shuffle操作。對于一個stage接收另一個stage的輸入,這種操作通常都會有一個參數numPartitions來顯示指定分區數。最典型的就是一些ByKey算子,比如groupByKey(numPartitions: Int),但是這個分區數需要多次測試來确定合适的值。首先确定父RDD中的分區數(通過rdd.partitions().size()可以确定RDD的分區數),然後在此基礎上增加分區數,多次調試直至在确定的資源任務能夠平穩、安全的運作。

對于沒有父RDD的RDD,比如通過加載HDFS上的資料生成的RDD,它的分區數由InputFormat切分機制決定。通常就是一個HDFS block塊對應一個分區,對于不可切分檔案則一個檔案對應一個分區。

對于通過SparkContext的parallelize方法或者makeRDD生成的RDD分區數可以直接在方法中指定,如果未指定,則參考spark.default.parallelism的參數配置。下面是預設情況下确定defaultParallelism的源碼:

override def defaultParallelism(): Int = {

    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))

}

通常,RDD的分區數與其所依賴的RDD的分區數相同,除非shuffle。但有幾個特殊的算子:

1.coalesce和repartition算子

筆者先放兩張關于該coalesce算子分别在RDD和DataSet中的源碼圖:(DataSet是Spark SQL中的分布式資料集,後邊說到Spark時再細講)

重要 | Spark分區并行度決定機制
重要 | Spark分區并行度決定機制

通過coalesce源碼分析,無論是在RDD中還是DataSet,預設情況下coalesce不會産生shuffle,此時通過coalesce建立的RDD分區數小于等于父RDD的分區數。 

筆者這裡就不放repartition算子的源碼了,分析起來也比較簡單,圖中我有所提示。但筆者建議,如下兩種情況,請使用repartition算子:

1)增加分區數repartition觸發shuffle,shuffle的情況下可以增加分區數。

coalesce預設不觸發shuffle,即使調用該算子增加分區數,實際情況是分區數仍然是目前的分區數。

2)極端情況減少分區數,比如将分區數減少為1調整分區數為1,此時資料處理上遊stage并行度降,很影響性能。此時repartition的優勢即不改變原來stage的并行度就展現出來了,在大資料量下,更為明顯。但需要注意,因為repartition會觸發shuffle,而要衡量好shuffle産生的代價和因為用repartition增加并行度帶來的效益。

2.union算子

還是直接看源碼:

重要 | Spark分區并行度決定機制
重要 | Spark分區并行度決定機制
重要 | Spark分區并行度決定機制

通過分析源碼,RDD在調用union算子時,最終生成的RDD分區數分兩種情況:1)union的RDD分區器已定義并且它們的分區器相同

多個父RDD具有相同的分區器,union後産生的RDD的分區器與父RDD相同且分區數也相同。比如,n個RDD的分區器相同且是defined,分區數是m個。那麼這n個RDD最終union生成的一個RDD的分區數仍是m,分區器也是相同的

2)不滿足第一種情況,則通過union生成的RDD的分區數為父RDD的分區數之和4.cartesian算子

通過上述coalesce、repartition、union算子介紹和源碼分析,很容易分析cartesian算子的源碼。通過cartesian得到RDD分區數是其父RDD分區數的乘積。

重要 | Spark分區并行度決定機制

在Spark SQL中,任務并行度參數則要參考spark.sql.shuffle.partitions,筆者這裡先放一張圖,詳細的後面講到Spark SQL時再細說:

重要 | Spark分區并行度決定機制

看下圖在Spark流式計算中,通常将SparkStreaming和Kafka整合,這裡又分兩種情況:

1.Receiver方式生成的微批RDD即BlockRDD,分區數就是block數

2.Direct方式生成的微批RDD即kafkaRDD,分區數和kafka分區數一一對應

繼續閱讀