天天看點

Spark coalesce 和repartitions 差別

 源碼包: org.apache.spark.rdd

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]
           
Return a new RDD that is reduced into 

numPartitions

 partitions.

This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

譯文:

        傳回一個經過簡化到numPartitions個分區的新RDD。這會導緻一個窄依賴,例如:你将1000個分區轉換成100個分區,這個過程不會發生shuffle,相反如果10個分區轉換成100個分區将會發生shuffle。然而如果你想大幅度合并分區,例如合并成一個分區,這會導緻你的計算在少數幾個叢集節點上計算(言外之意:并行度不夠)。為了避免這種情況,你可以将第二個shuffle參數傳遞一個true,這樣會在重新分區過程中多一步shuffle,這意味着上遊的分區可以并行運作。

注意:

        第二個參數shuffle=true,将會産生多于之前的分區數目,例如你有一個個數較少的分區,假如是100,調用coalesce(1000, shuffle = true)将會使用一個  HashPartitioner産生1000個分區分布在叢集節點上。這個(對于提高并行度)是非常有用的。

def repartition(numPartitions: Int)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]
           

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you are decreasing the number of partitions in this RDD, consider using 

coalesce

, which can avoid performing a shuffle.

TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.

譯文:

        傳回一個恰好有numPartitions個分區的RDD,可以增加或者減少此RDD的并行度。内部,這将使用shuffle重新分布資料,如果你減少分區數,考慮使用coalesce,這樣可以避免執行shuffle

參考:https://blog.csdn.net/dax1n/article/details/53431373 

coalesce與repartition:重分區

    (*)都是重分區

    (*)差別:coalesce 預設不會進行shuffle(false)

                       repartition 就會進行shuffle

    (*)舉例:

             val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

             檢視分區個數:rdd1.partitions.length

             重新分區: val rdd2 = rdd1.repartition(3)

                       val rdd3 = rdd1.coalesce(3,false)  --->  分區數:2

                       val rdd4 = rdd1.coalesce(3,true)   --->  分區數:3

繼續閱讀