一、aggregateByKey [Pair]
像聚合函數一樣工作,但聚合應用于具有相同鍵的值。 也不像聚合函數,初始值不應用于第二個reduce。
清單變式
(1)def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
ps:
使用給定的組合函數和中性“零值”彙總每個鍵的值。此函數可傳回不同的結果類型U,而不是此RDD中的值的類型,
是以,我們需要一個用于将V合并成U的操作和用于合并兩個U的一個操作,如在scala.TraversableOnce中。 前一個操作用于合并a中的值
分區,後者用于合并分區之間的值。 避免記憶配置設定,這兩個函數都允許修改并傳回其第一個參數而不是建立一個新的U.
(2)def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
ps:
使用給定的組合函數和中性“零值”彙總每個鍵的值。此函數可傳回不同的結果類型U,而不是此RDD中的值的類型,
是以,我們需要一個用于将V合并成U的操作和用于合并兩個U的一個操作,如在scala.TraversableOnce中。 前一個操作用于合并a中的值
分區,後者用于合并分區之間的值。 避免記憶配置設定,這兩個函數都允許修改并傳回其第一個參數而不是建立一個新的U.
(3)def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
ps:
使用給定的組合函數和中性“零值”彙總每個鍵的值。此函數可傳回不同的結果類型U,而不是此RDD中的值的類型,
是以,我們需要一個用于将V合并成U的操作和用于合并兩個U的一個操作,如在scala.TraversableOnce中。 前一個操作用于合并a中的值
分區,後者用于合并分區之間的值。 避免記憶配置設定,這兩個函數都允許修改并傳回其第一個參數而不是建立一個新的U.
參數說明:
U: ClassTag==>表示這個最終的RDD的傳回值類型.
zeroValue: U==>表示在每個分區中第一次拿到key值時,用于建立一個傳回類型的函數,這個函數最終會被包裝成先生成一個傳回類型,
然後通過調用seqOp函數,把第一個key對應的value添加到這個類型U的變量中,下面代碼的紅色部分.
seqOp: (U,V) => U ==> 這個用于把疊代分區中key對應的值添加到zeroValue建立的U類型執行個體中.
combOp: (U,U) => U ==> 這個用于合并每個分區中聚合過來的兩個U類型的值.
舉例:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
// lets have a look at what is in the partitions
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect
res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
二、reduceByKey
此函數提供了Spark中衆所周知的reduce功能。 請注意,您提供的任何功能都應該是可交換的,以便産生可重複的結果
清單變式:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
舉例:
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
注:
簡單了解reduceByKey((x,y)=>x+y),即不要看元祖中的key,永遠隻考慮value即可.(x,y)代表的是value的操作!!