天天看点

Spark编程之基本的RDD算子-aggregate和aggregateByKey

spark基本的RDD算子:

在学习spark的过程中,有这样几个算子非常重要,但是却容易混淆。在这里我想做一下记录.

  • 1) 第一个是aggregate算子.

我们首先可以看看aggregate算子的api,

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
           

这个算子接收三个参数,第一个是初始值,zeroValue,类型为U。

第二个接收一个函数: seqOp,这个函数接收的参数的类型为(U,T)类型,返回值为一个U类型。

第三个参数是一个函数:comOp,这个函数接收两个参数(U,U),返回值为U类型。

最后整个算子返回的类型为U类型。

这个算子将两个不同的reduce类型的函数应用到了RDD。第一个reduce类型的函数作用于每一个数据分区。

第二个reduce类型的函数将上一个不同的分区处理后的结果进行汇总。然后返回一个共同的结果.

这样极大的增加了灵活性,比如说第一个函数可以为一个max函数,求出每个分区的最大值。

第二个分区可以是一个求和函数,比如sum,这样子返回的最终结果就是每个分区最大值的求和。

这个算子有以下几点需要注意:

  • 1)初始值需要作用于两个函数。即在第一个阶段的reduce过程需要应用到zerovalue,第二个阶段的结果也需要考虑到

    zeroValue。

  • 2) 不同的分区在执行的时候没有什么顺序。

来看一些比较基本的例子:

val z = sc.parallelize(List(,,,,,), )

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
//这个函数用于查看数据存储在了那个分区 。

z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ])

z.aggregate()(math.max(_, _), _ + _)
res40: Int = 
//结果分析.从上一个我们可以看到先求出了第一个分区的最大值为3,第二个的最大值为6.然后调用了sum函数进行累加。
//结果返回9.
//如果我们将初始值设置为5的话,那么返回值的最终结果为16.

//第一个分区的max函数的最大值的结果(5,1,2,3)=5
// 第2个分区的max函数的最大值的结果 max(5, 4, 5, 6) = 6
// 最后调用sum函数,其结果为 5 + 5 + 6 = 16。可以看到第二个函数调用的时候,同样应用到了初始值。
z.aggregate()(math.max(_, _), _ + _)
res29: Int = 

//第二个例子:
val z = sc.parallelize(List("a","b","c","d","e","f"),)

z.mapPartitionsWithIndex(myfunc).collect
res31: Array[String] = Array([partID:, val: a], [partID:, val: b], [partID:, val: c], [partID:, val: d], [partID:, val: e], [partID:, val: f])

z.aggregate("")(_ + _, _+_)
res115: String = abcdef 
//同样跟刚才类似,这个算子将以上的结果进行了累加,最后返回的结果是一个字符串
///结果是abcdef


z.aggregate("x")(_ + _, _+_)
res116: String = xxdefxabc
//当加入初始值x的时候,我们可以看到首先在第一个分区a,b,c加入了一个x,xabc。
//紧接着在第二个分区d,e,f加入了第二个xdef。最后是一个汇总函数xxdefxabc。总共用了三次

//再看几个例子:
val z = sc.parallelize(List("12","23","345","4567"),)
z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res141: String = 
//注意这个时候传入的初始值为空字符串。第一个函数调用的是求每个分区的字符串长度的最大值。可以看到第一个分区的 
//结果为2.然后是2.toString,所以其结果为"2",同样的"345", "4567"和"", 所以其结果为
//"4".接下来在此汇总的话相当于得到个结果"42"或者是"24".注意和顺序是没有关系的。

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res142: String = 
//注意当max变为min的时候,首先在第一个分区"","12"。这两个相比的时候""长度为0。然后调用
//toString,结果为“0”。然后这个 “0”又和“23”进行比较,最小的长度为1,toString之后变
//为“1”。同样的也适用于第二个分区,所以最后汇总之后结果为11.这个11 是一个字符串

val z = sc.parallelize(List("12","23","345",""),)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res143: String = 

//这个问题的分析与刚才类似。只不过第二个分区有一个空字符串,所以其返回结果是10或01.
// 注意这个10也是字符串。
           
  • 2) aggregateByKey算子:

首先看看api定义:

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
           
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
           
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
           

以上三个重载的算子共同实现了aggregateByKey算子。只不过后面的两个参数加入了分区数和Partitioner。

注意: 和刚才aggregate不同的是,aggregateByKey算子作用的是一个(key, value)类型的键值对。它只作用于key相同的情况。还有一点需要注意的是aggregateByKey的初始值不作用于第二个函数。

其他的和aggregate类似。

我们来看看基本的用法:

val pairRDD = sc.parallelize(List( ("cat",), ("cat", ), ("mouse", ),("cat", ), ("dog", ), ("mouse", )), )

def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect

res2: Array[String] = Array([partID:, val: (cat,)], [partID:, val: (cat,)], [partID:, val: (mouse,)], [partID:, val: (cat,)], [partID:, val: (dog,)], [partID:, val: (mouse,)])
//通过这个可以看到每个数据的不同的分区。

pairRDD.aggregateByKey()(math.max(_, _), _ + _).collect
res3: Array[(String, Int)] = Array((dog,), (cat,), (mouse,))

//根据相同的key进行math.max的value值的操作。比如说在第一个分区里。
//("cat",2), ("cat", 5), ("mouse", 4).则cat的最大的value值为5。mouse的最大的值为4.

//第二个分区里,("cat", 12), ("dog", 12), ("mouse", 2)。cat的最大值为12,dog的最大值为12,mouse的最大值为2.

//接下来进行第二个函数的操作。将第一个分区和第二个分区的key相同的值进行汇总。结果为:
//(cat,17), (dog,12), (mouse,6)。即上述的结果

pairRDD.aggregateByKey()(math.max(_, _), _ + _).collect
res4: Array[(String, Int)] = Array((dog,), (cat,), (mouse,))

//如果加入初始值,比如说是100.在第一个分区里。
//("cat",2), ("cat", 5), ("mouse", 4).则cat的最大的value值为100。mouse的最大的值为100.

//第二个分区里,("cat", 12), ("dog", 12), ("mouse", 2)。cat的最大值为100,dog的最大值为100,mouse的最大值为100.

//接下来进行第二个函数的操作。将第一个分区和第二个分区的key相同的值进行汇总。结果为:
//(cat,200), (dog,100), (mouse,200)。即上述的结果
           

继续阅读