天天看点

PySpark-RDD聚合算子reduce\fold\aggregate比较和理解

比较和理解RDD中进行聚合操作的几种方法: reduce()\fold()\aggregate()

  1. rdd.reduce(func)
    #reduce() :rdd.reduce(func),对同类型的数据的RDD进行聚合操作,返回值是一个同类型的数值结果
    num=sc.parallelize([1,2,3,4])
    sum=num.reduce(lambda x,y: x+y) #理解x,y: x,它代指的是返回值,而y是对rdd各元素的遍历。所以,x+y表示对num中数据进行累加
    print(sum) #10
               
  2. rdd.fold(value,func)
    #fold(): rdd.fold(value,func) ,同reduce()类似,但需要一个初始值作为第一次调用的结果。(加法初始值为0,乘法初始值为1)
    num=sc.parallelize([1,2,3,4])
    sum = num.fold(0,lambda x,y:x+y)
    print(sum)  #10
    sum = num.fold(1,lambda x,y:x+y)
    print(sum) #15
    sum = num.fold(2,lambda x,y:x+y)
    print(sum) #20
               
  3. rdd.aggregate((value, value), seqOp, combOp)
    #aggregate(): aggregate((0, 0), seqOp, combOp).
    ''' 
    seqOp相当于是在各个分区里进行的聚合操作,支持不同类型的聚合。 
    combOp是将seqOp后的每个分区的结果再进行聚合,即把各个累加器中的结果进行聚合,此时只能进行同数据类型聚合。
    '''
    seqOp = (lambda x, y: x+y)  #理解x,y: x,它代指的是返回值,这里指累加和,而y是对rdd各元素的遍历。同reduce
    combOp = (lambda x, y: x+y) #理解x,y: x,y都是seqOp步骤中的返回值
    sc.parallelize([1, 2, 3, 4]).aggregate(0, seqOp, combOp) #10 
               
    #输出是多个,不同于原输入的聚合。可以做到一面累加、一面计数即count()
    seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) #理解x,y: x,它代指的是返回值,这里指(累加和,计数值),而y是对rdd各元素的遍历。
    combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) #理解x,y: x,y都是seqOp步骤中的返回值,即(累加和,计数值)
    r = sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 
    print(r)  #(10, 4)
    print(r[0]/r[1]) #2.5, 即平均值
               
    #如果初始值设置为非0数值
    sc.parallelize([1, 2, 3, 4]).aggregate((4, 4), seqOp, combOp) #(30, 24) ,
    #按照计算流程,seqOp分区节点计算: (4+1,4+1),(4+2,4+1),(4+3,4+1),(4+4,4+1)
    #combOp聚合计算:(4,4)+(5+6+7+8,5+5+5+5) = (30,24) 
    
    seqOp = (lambda x, y: (x[0] + y, x[1] + 1, x[2] * 2))
    combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] *y[2]))
    sc.parallelize([1, 2, 3, 4]).aggregate((0, 0,1), seqOp, combOp) #(10, 4,16)
    #按照计算流程,1*((1*2) * (1*2) * (1*2) * (1*2)) = 16
    sc.parallelize([1, 2, 3, 4]).aggregate((0, 0,2), seqOp, combOp) #(10, 4, 512)
    #按照计算流程,2*((2*2) * (2*2) * (2*2) * (2*2)) = 512
               

继续阅读