天天看点

spark中RDD的方法整理

aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]):U

def main(args: Array[String]): Unit = {
    //seqOp函数返回的是值小的 
    def seqOp(arg1:Int,arg2:Int):Int={
      var res:Int=arg1
      if(arg1>arg2)
        res=arg2
      println("seqOp:"+arg1+","+arg2+"=>"+res)
      res
    }
    //combOp函数两个值求和
    def combOp(arg1:Int,arg2:Int):Int={
      println("combOp:"+arg1+","+arg2+"=>"+(arg1+arg2))
      arg1+arg2
    }
    //将每个分区index显示出来
    def myfunc[T](index:Int,iter:Iterator[T]):Iterator[(Int,T)]={
      var res = List[(Int,T)]()
      for(x<-iter)
        res.::=(index,x)
      res.iterator
    }
    val sparkConf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local")
    val ssc:SparkContext=new SparkContext(sparkConf)
    val data=ssc.parallelize( to ,)
    data.mapPartitionsWithIndex(myfunc).collect()
    val  result=data.aggregate()(seqOp,combOp)
    print(result)
  }
           
//结果如下
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>

seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>

seqOp:,=>
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>

           

val data=ssc.parallelize(1 to 10,3)将1-10划分为三个分区,第一个分区存放了1 2 3 第二个分区存放了4 5 6 第三个分区存放了7 8 9 10。

aggregate的作用是什么呢,这个方法有两个括号参数,第一个括号我们传入了int类型的2,第二个括号传入了两个函数,第一个函数求两个值的最小值,第二个函数求两个值的和。

首先:2与第一个分区中的第一个数比较即2与1 我们得到seqOp(2,1)然后返回了1,接着刚刚返回的值1与第一个分区第二个数2比较,我们得到seqOp(1,2)返回值是1,接着与第一分区第三个值3比较,我们得到seqOp(1,3)返回1,

然后:aggregate的第一个参数2继续去第二个分区 得到最小值2,接着下面的分区都是跟上面一样操作,第三个分区得到也是2。其实含义就是找到某个分区然后加上第一个参数2得到该分区最小的数,当然这只是一个例子 我们可以自定义其他函数。

最后:第一个分区得到最小值1后执行combOp(2,1)返回3,然后第二个分区执行combOp(3,2)返回5,然后第三个分区执行combOp(5,2)得到7,因此最后的aggregate返回的是整形数字7。

继续阅读