天天看點

[Spark基礎]-- spark RDD操作算子詳解(彙總)

一、aggregateByKey [Pair]

像聚合函數一樣工作,但聚合應用于具有相同鍵的值。 也不像聚合函數,初始值不應用于第二個reduce。

清單變式

(1)def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

ps:

使用給定的組合函數和中性“零值”彙總每個鍵的值。此函數可傳回不同的結果類型U,而不是此RDD中的值的類型,

是以,我們需要一個用于将V合并成U的操作和用于合并兩個U的一個操作,如在scala.TraversableOnce中。 前一個操作用于合并a中的值

分區,後者用于合并分區之間的值。 避免記憶配置設定,這兩個函數都允許修改并傳回其第一個參數而不是建立一個新的U.

(2)def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

ps:

使用給定的組合函數和中性“零值”彙總每個鍵的值。此函數可傳回不同的結果類型U,而不是此RDD中的值的類型,

是以,我們需要一個用于将V合并成U的操作和用于合并兩個U的一個操作,如在scala.TraversableOnce中。 前一個操作用于合并a中的值

分區,後者用于合并分區之間的值。 避免記憶配置設定,這兩個函數都允許修改并傳回其第一個參數而不是建立一個新的U.

(3)def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

ps:

使用給定的組合函數和中性“零值”彙總每個鍵的值。此函數可傳回不同的結果類型U,而不是此RDD中的值的類型,

是以,我們需要一個用于将V合并成U的操作和用于合并兩個U的一個操作,如在scala.TraversableOnce中。 前一個操作用于合并a中的值

分區,後者用于合并分區之間的值。 避免記憶配置設定,這兩個函數都允許修改并傳回其第一個參數而不是建立一個新的U.

參數說明:

              U: ClassTag==>表示這個最終的RDD的傳回值類型.

              zeroValue: U==>表示在每個分區中第一次拿到key值時,用于建立一個傳回類型的函數,這個函數最終會被包裝成先生成一個傳回類型,

                                    然後通過調用seqOp函數,把第一個key對應的value添加到這個類型U的變量中,下面代碼的紅色部分.

              seqOp: (U,V) => U ==> 這個用于把疊代分區中key對應的值添加到zeroValue建立的U類型執行個體中.

              combOp: (U,U) => U ==> 這個用于合并每個分區中聚合過來的兩個U類型的值.

舉例:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

 // lets have a look at what is in the partitions
 def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
   iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
 }
 pairRDD.mapPartitionsWithIndex(myfunc).collect

 res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])

 pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
 res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

 pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect 
res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))      

二、reduceByKey

        此函數提供了Spark中衆所周知的reduce功能。 請注意,您提供的任何功能都應該是可交換的,以便産生可重複的結果

清單變式:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

舉例:

        val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)

        val b = a.map(x => (x.length, x))

        b.reduceByKey(_ + _).collect

       res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

       val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

       val b = a.map(x => (x.length, x))

       b.reduceByKey(_ + _).collect

       res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

注:

簡單了解reduceByKey((x,y)=>x+y),即不要看元祖中的key,永遠隻考慮value即可.(x,y)代表的是value的操作!!

繼續閱讀