天天看点

Spark-核心编程(四)RDD行动算子

RDD行动算子

转换算子是不会触发作业的执行的,只是定义作业需要怎么做,行动算子将会使作业(job)真正的执行。其底层job将会执行上下文对象的runJob方法,底层代码中会创建ActiveJob,并提交执行。

reduce

def reduce(f: (T, T) => T): T
           

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

collect

在驱动程序中,以数组 Array 的形式返回数据集的所有元素,按照分区的顺序来采集数据。

count

返回 RDD 中元素的个数

first

def first(): T
           

返回 RDD 中的第一个元素

take

返回一个由 RDD 的前 n 个元素组成的数组

takeOrdered

返回该 RDD 排序后的前 n 个元素组成的数组

object TestTransformReduce {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)


        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        // reduce:聚合操作
        println(rdd.reduce(_ + _))
        // 10

        // collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
        println(rdd.collect().mkString(","))
        // 1,2,3,4

        // count:数据源中数据的个数
        println(rdd.count())
        // 4

        // first:获取数据源中的第一个
        println(rdd.first())
        // 1

        // take:获取前N个数据
        println(rdd.take(3).mkString(","))
        // 1,2,3

        // takeOrdered:数据排序后取前N个数据
        println(sc.makeRDD(List(4, 1, 3, 2)).takeOrdered(3).mkString(","))
        // 1,2,3

        sc.stop()
    }
}
           

aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
           

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

fold

def fold(zeroValue: T)(op: (T, T) => T): T
           

折叠操作,aggregate 的简化版操作

object TestTransformAggregate {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        // aggregateByKey:初始值只会参与分区内计算
        // aggregate:初始值会参与分区内以及分区间的计算
        println(rdd.aggregate(0)(_ + _, _ + _))
        // 10
        println(rdd.aggregate(10)(_ + _, _ + _))
        // 40

        println(rdd.fold(10)(_ + _))
        // 40

        sc.stop()
    }
}
           

countByKey

统计每种 key 的个数

println(sc.makeRDD(List(1, 2, 3, 4, 4, 5, 1), 2).countByValue())
// Map(5 -> 1, 1 -> 2, 2 -> 1, 3 -> 1, 4 -> 2)

println(sc.makeRDD(List(
    ("a", 1), ("a", 2), ("a", 4), ("b", 3)
)).countByKey())
// Map(a -> 3, b -> 1)

           

save相关算子

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
    path: String,
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit
           

将数据保存到不同格式的文件中

val rdd = sc.makeRDD(List(
    ("a", 1), ("b", 2), ("c", 3)
), 2)

rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output2")
// saveAsSequenceFile方法要求数据的格式必须为KV类型
rdd.saveAsSequenceFile("output3")

           

foreach

def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
           

分布式遍历 RDD 中的每一个元素,调用指定函数

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

// 这个的foreach是Driver端内存集合的循环遍历方法
rdd.collect.foreach(println) // 1, 2, 3, 4

println("-------")

// 这个的foreach是Executor端内存数据的遍历
rdd.foreach(println)         // 1, 3, 2, 4 无序的,但1一定在2前,3一定在4前
           

继续阅读