combineByKey 作為spark 的核心算子之一,有必要詳細了解。reduceByKey 和groupByKey 等健值對算子底層都實作該算子。(1.6.0版更新為combineByKeyWithClassTag)
combineByKey 源碼定義:
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
createCombine: 對每個分區中每個key中value中的第一個值,進行處理(可以了解為,分區内部進行按Key分組,每個Key的第一個value進行預處理)預處理邏輯。
mergeValue:分區内部進行聚合,相同的Key的value進行局部運算,區間内部聚合邏輯。
mergeCombiners:此階段發生shuffer,對不同區間局部運算後的結果再做運算。分區之間聚合邏輯。
測試樣例:
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), )
val b = sc.parallelize(List(,,,,,,,,), )
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((,List(cat, dog, turkey)), (,List(gnu, rabbit, salmon, bee, bear, wolf)))
過一下下圖,邏輯應該就清晰了
注意點:createCombine是對每個分區中每個key中value中的第一個值,進行處理(可以了解為,分區内部進行按Key分組,每個Key的第一個value進行預處理)預處理邏輯。