源碼包: org.apache.spark.rdd
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]
Return a new RDD that is reduced into
numPartitions
partitions.
This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
譯文:
傳回一個經過簡化到numPartitions個分區的新RDD。這會導緻一個窄依賴,例如:你将1000個分區轉換成100個分區,這個過程不會發生shuffle,相反如果10個分區轉換成100個分區将會發生shuffle。然而如果你想大幅度合并分區,例如合并成一個分區,這會導緻你的計算在少數幾個叢集節點上計算(言外之意:并行度不夠)。為了避免這種情況,你可以将第二個shuffle參數傳遞一個true,這樣會在重新分區過程中多一步shuffle,這意味着上遊的分區可以并行運作。
注意:
第二個參數shuffle=true,将會産生多于之前的分區數目,例如你有一個個數較少的分區,假如是100,調用coalesce(1000, shuffle = true)将會使用一個 HashPartitioner産生1000個分區分布在叢集節點上。這個(對于提高并行度)是非常有用的。
def repartition(numPartitions: Int)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using
coalesce
, which can avoid performing a shuffle.
TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
譯文:
傳回一個恰好有numPartitions個分區的RDD,可以增加或者減少此RDD的并行度。内部,這将使用shuffle重新分布資料,如果你減少分區數,考慮使用coalesce,這樣可以避免執行shuffle
參考:https://blog.csdn.net/dax1n/article/details/53431373
coalesce與repartition:重分區
(*)都是重分區
(*)差別:coalesce 預設不會進行shuffle(false)
repartition 就會進行shuffle
(*)舉例:
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
檢視分區個數:rdd1.partitions.length
重新分區: val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,false) ---> 分區數:2
val rdd4 = rdd1.coalesce(3,true) ---> 分區數:3