天天看點

Spark程式設計之基本的RDD算子-aggregate和aggregateByKey

spark基本的RDD算子:

在學習spark的過程中,有這樣幾個算子非常重要,但是卻容易混淆。在這裡我想做一下記錄.

  • 1) 第一個是aggregate算子.

我們首先可以看看aggregate算子的api,

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
           

這個算子接收三個參數,第一個是初始值,zeroValue,類型為U。

第二個接收一個函數: seqOp,這個函數接收的參數的類型為(U,T)類型,傳回值為一個U類型。

第三個參數是一個函數:comOp,這個函數接收兩個參數(U,U),傳回值為U類型。

最後整個算子傳回的類型為U類型。

這個算子将兩個不同的reduce類型的函數應用到了RDD。第一個reduce類型的函數作用于每一個資料分區。

第二個reduce類型的函數将上一個不同的分區處理後的結果進行彙總。然後傳回一個共同的結果.

這樣極大的增加了靈活性,比如說第一個函數可以為一個max函數,求出每個分區的最大值。

第二個分區可以是一個求和函數,比如sum,這樣子傳回的最終結果就是每個分區最大值的求和。

這個算子有以下幾點需要注意:

  • 1)初始值需要作用于兩個函數。即在第一個階段的reduce過程需要應用到zerovalue,第二個階段的結果也需要考慮到

    zeroValue。

  • 2) 不同的分區在執行的時候沒有什麼順序。

來看一些比較基本的例子:

val z = sc.parallelize(List(,,,,,), )

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
//這個函數用于檢視資料存儲在了那個分區 。

z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ])

z.aggregate()(math.max(_, _), _ + _)
res40: Int = 
//結果分析.從上一個我們可以看到先求出了第一個分區的最大值為3,第二個的最大值為6.然後調用了sum函數進行累加。
//結果傳回9.
//如果我們将初始值設定為5的話,那麼傳回值的最終結果為16.

//第一個分區的max函數的最大值的結果(5,1,2,3)=5
// 第2個分區的max函數的最大值的結果 max(5, 4, 5, 6) = 6
// 最後調用sum函數,其結果為 5 + 5 + 6 = 16。可以看到第二個函數調用的時候,同樣應用到了初始值。
z.aggregate()(math.max(_, _), _ + _)
res29: Int = 

//第二個例子:
val z = sc.parallelize(List("a","b","c","d","e","f"),)

z.mapPartitionsWithIndex(myfunc).collect
res31: Array[String] = Array([partID:, val: a], [partID:, val: b], [partID:, val: c], [partID:, val: d], [partID:, val: e], [partID:, val: f])

z.aggregate("")(_ + _, _+_)
res115: String = abcdef 
//同樣跟剛才類似,這個算子将以上的結果進行了累加,最後傳回的結果是一個字元串
///結果是abcdef


z.aggregate("x")(_ + _, _+_)
res116: String = xxdefxabc
//當加入初始值x的時候,我們可以看到首先在第一個分區a,b,c加入了一個x,xabc。
//緊接着在第二個分區d,e,f加入了第二個xdef。最後是一個彙總函數xxdefxabc。總共用了三次

//再看幾個例子:
val z = sc.parallelize(List("12","23","345","4567"),)
z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res141: String = 
//注意這個時候傳入的初始值為空字元串。第一個函數調用的是求每個分區的字元串長度的最大值。可以看到第一個分區的 
//結果為2.然後是2.toString,是以其結果為"2",同樣的"345", "4567"和"", 是以其結果為
//"4".接下來在此彙總的話相當于得到個結果"42"或者是"24".注意和順序是沒有關系的。

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res142: String = 
//注意當max變為min的時候,首先在第一個分區"","12"。這兩個相比的時候""長度為0。然後調用
//toString,結果為“0”。然後這個 “0”又和“23”進行比較,最小的長度為1,toString之後變
//為“1”。同樣的也适用于第二個分區,是以最後彙總之後結果為11.這個11 是一個字元串

val z = sc.parallelize(List("12","23","345",""),)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res143: String = 

//這個問題的分析與剛才類似。隻不過第二個分區有一個空字元串,是以其傳回結果是10或01.
// 注意這個10也是字元串。
           
  • 2) aggregateByKey算子:

首先看看api定義:

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
           
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
           
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
           

以上三個重載的算子共同實作了aggregateByKey算子。隻不過後面的兩個參數加入了分區數和Partitioner。

注意: 和剛才aggregate不同的是,aggregateByKey算子作用的是一個(key, value)類型的鍵值對。它隻作用于key相同的情況。還有一點需要注意的是aggregateByKey的初始值不作用于第二個函數。

其他的和aggregate類似。

我們來看看基本的用法:

val pairRDD = sc.parallelize(List( ("cat",), ("cat", ), ("mouse", ),("cat", ), ("dog", ), ("mouse", )), )

def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect

res2: Array[String] = Array([partID:, val: (cat,)], [partID:, val: (cat,)], [partID:, val: (mouse,)], [partID:, val: (cat,)], [partID:, val: (dog,)], [partID:, val: (mouse,)])
//通過這個可以看到每個資料的不同的分區。

pairRDD.aggregateByKey()(math.max(_, _), _ + _).collect
res3: Array[(String, Int)] = Array((dog,), (cat,), (mouse,))

//根據相同的key進行math.max的value值的操作。比如說在第一個分區裡。
//("cat",2), ("cat", 5), ("mouse", 4).則cat的最大的value值為5。mouse的最大的值為4.

//第二個分區裡,("cat", 12), ("dog", 12), ("mouse", 2)。cat的最大值為12,dog的最大值為12,mouse的最大值為2.

//接下來進行第二個函數的操作。将第一個分區和第二個分區的key相同的值進行彙總。結果為:
//(cat,17), (dog,12), (mouse,6)。即上述的結果

pairRDD.aggregateByKey()(math.max(_, _), _ + _).collect
res4: Array[(String, Int)] = Array((dog,), (cat,), (mouse,))

//如果加入初始值,比如說是100.在第一個分區裡。
//("cat",2), ("cat", 5), ("mouse", 4).則cat的最大的value值為100。mouse的最大的值為100.

//第二個分區裡,("cat", 12), ("dog", 12), ("mouse", 2)。cat的最大值為100,dog的最大值為100,mouse的最大值為100.

//接下來進行第二個函數的操作。将第一個分區和第二個分區的key相同的值進行彙總。結果為:
//(cat,200), (dog,100), (mouse,200)。即上述的結果
           

繼續閱讀