天天看點

spark中RDD的方法整理

aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]):U

def main(args: Array[String]): Unit = {
    //seqOp函數傳回的是值小的 
    def seqOp(arg1:Int,arg2:Int):Int={
      var res:Int=arg1
      if(arg1>arg2)
        res=arg2
      println("seqOp:"+arg1+","+arg2+"=>"+res)
      res
    }
    //combOp函數兩個值求和
    def combOp(arg1:Int,arg2:Int):Int={
      println("combOp:"+arg1+","+arg2+"=>"+(arg1+arg2))
      arg1+arg2
    }
    //将每個分區index顯示出來
    def myfunc[T](index:Int,iter:Iterator[T]):Iterator[(Int,T)]={
      var res = List[(Int,T)]()
      for(x<-iter)
        res.::=(index,x)
      res.iterator
    }
    val sparkConf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local")
    val ssc:SparkContext=new SparkContext(sparkConf)
    val data=ssc.parallelize( to ,)
    data.mapPartitionsWithIndex(myfunc).collect()
    val  result=data.aggregate()(seqOp,combOp)
    print(result)
  }
           
//結果如下
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>

seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>

seqOp:,=>
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>

           

val data=ssc.parallelize(1 to 10,3)将1-10劃分為三個分區,第一個分區存放了1 2 3 第二個分區存放了4 5 6 第三個分區存放了7 8 9 10。

aggregate的作用是什麼呢,這個方法有兩個括号參數,第一個括号我們傳入了int類型的2,第二個括号傳入了兩個函數,第一個函數求兩個值的最小值,第二個函數求兩個值的和。

首先:2與第一個分區中的第一個數比較即2與1 我們得到seqOp(2,1)然後傳回了1,接着剛剛傳回的值1與第一個分區第二個數2比較,我們得到seqOp(1,2)傳回值是1,接着與第一分區第三個值3比較,我們得到seqOp(1,3)傳回1,

然後:aggregate的第一個參數2繼續去第二個分區 得到最小值2,接着下面的分區都是跟上面一樣操作,第三個分區得到也是2。其實含義就是找到某個分區然後加上第一個參數2得到該分區最小的數,當然這隻是一個例子 我們可以自定義其他函數。

最後:第一個分區得到最小值1後執行combOp(2,1)傳回3,然後第二個分區執行combOp(3,2)傳回5,然後第三個分區執行combOp(5,2)得到7,是以最後的aggregate傳回的是整形數字7。

繼續閱讀