aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]):U
def main(args: Array[String]): Unit = {
//seqOp函數傳回的是值小的
def seqOp(arg1:Int,arg2:Int):Int={
var res:Int=arg1
if(arg1>arg2)
res=arg2
println("seqOp:"+arg1+","+arg2+"=>"+res)
res
}
//combOp函數兩個值求和
def combOp(arg1:Int,arg2:Int):Int={
println("combOp:"+arg1+","+arg2+"=>"+(arg1+arg2))
arg1+arg2
}
//将每個分區index顯示出來
def myfunc[T](index:Int,iter:Iterator[T]):Iterator[(Int,T)]={
var res = List[(Int,T)]()
for(x<-iter)
res.::=(index,x)
res.iterator
}
val sparkConf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local")
val ssc:SparkContext=new SparkContext(sparkConf)
val data=ssc.parallelize( to ,)
data.mapPartitionsWithIndex(myfunc).collect()
val result=data.aggregate()(seqOp,combOp)
print(result)
}
//結果如下
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>
seqOp:,=>
seqOp:,=>
seqOp:,=>
seqOp:,=>
combOp:,=>
val data=ssc.parallelize(1 to 10,3)将1-10劃分為三個分區,第一個分區存放了1 2 3 第二個分區存放了4 5 6 第三個分區存放了7 8 9 10。
aggregate的作用是什麼呢,這個方法有兩個括号參數,第一個括号我們傳入了int類型的2,第二個括号傳入了兩個函數,第一個函數求兩個值的最小值,第二個函數求兩個值的和。
首先:2與第一個分區中的第一個數比較即2與1 我們得到seqOp(2,1)然後傳回了1,接着剛剛傳回的值1與第一個分區第二個數2比較,我們得到seqOp(1,2)傳回值是1,接着與第一分區第三個值3比較,我們得到seqOp(1,3)傳回1,
然後:aggregate的第一個參數2繼續去第二個分區 得到最小值2,接着下面的分區都是跟上面一樣操作,第三個分區得到也是2。其實含義就是找到某個分區然後加上第一個參數2得到該分區最小的數,當然這隻是一個例子 我們可以自定義其他函數。
最後:第一個分區得到最小值1後執行combOp(2,1)傳回3,然後第二個分區執行combOp(3,2)傳回5,然後第三個分區執行combOp(5,2)得到7,是以最後的aggregate傳回的是整形數字7。