天天看點

spark 算子combineByKey 詳解

combineByKey 作為spark 的核心算子之一,有必要詳細了解。reduceByKey 和groupByKey 等健值對算子底層都實作該算子。(1.6.0版更新為combineByKeyWithClassTag)

combineByKey 源碼定義:

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]


def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]


def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
           

createCombine: 對每個分區中每個key中value中的第一個值,進行處理(可以了解為,分區内部進行按Key分組,每個Key的第一個value進行預處理)預處理邏輯。

mergeValue:分區内部進行聚合,相同的Key的value進行局部運算,區間内部聚合邏輯。

mergeCombiners:此階段發生shuffer,對不同區間局部運算後的結果再做運算。分區之間聚合邏輯。

測試樣例:

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), )
val b = sc.parallelize(List(,,,,,,,,), )
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((,List(cat, dog, turkey)), (,List(gnu, rabbit, salmon, bee, bear, wolf)))
           

過一下下圖,邏輯應該就清晰了

spark 算子combineByKey 詳解

注意點:createCombine是對每個分區中每個key中value中的第一個值,進行處理(可以了解為,分區内部進行按Key分組,每個Key的第一個value進行預處理)預處理邏輯。

繼續閱讀