天天看点

Spark学习03——RDD转换算子

所有RDD转换算子如下:

map、faltmap、mapPartitions、mapPartitionsWithIndex、filter、sample、union、intersection、distinct、cartesian、pipe、coalesce、repartition、repartitionAndSortWithinPartitions、glom、randomSplit

具体解释和例子

1. map

处理每个元素,一个元素生成一个元素。将函数应用与RDD中的每个元素,将返回值构成新的RDD

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val inputRDD = sc.textFile("examples/src/main/resources/data.txt")
    val mapRDD = inputRDD.map(line => line.split(" "))
    mapRDD.foreach(array => println(array(0)))
    sc.stop()
           

2. faltmap

处理每个元素,一个元素生成一个迭代器。将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,

即将旧RDD所有内容组成新RDD,只含有一个列表。通常用来切分单词

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val inputRDD = sc.textFile("examples/src/main/resources/data.txt")
    val mapRDD = inputRDD.flatMap(line => line.split(" "))
    mapRDD.foreach(println)
    sc.stop()
           

3. mapPartitions

mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象

(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val a = sc.parallelize(1 to 9, 3)
    val result = a.mapPartitions(iter => {
      var res = List[(Int, Int)]()
      while (iter.hasNext) {
        val cur = iter.next;
        res.::=(cur, cur * 2)
      }
      res.iterator
    })
    println(result.collect().mkString)
    println(result.first())
    sc.stop()
           

4. mapPartitionsWithIndex

与mapPartitions类似,但带了分区序号

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val a = sc.parallelize(1 to 9, 3)
    val result = a.mapPartitionsWithIndex((index: Int, iter: Iterator[Int]) => {
      var res = List[(Int, Int, Int)]()
      while (iter.hasNext) {
        val cur = iter.next;
        res.::=(index, cur, cur * 2)
      }
      res.iterator
    })
    println(result.collect().mkString)
    println(result.first())
    sc.stop()
           

5. filter

过滤操作。接受一个函数,将RDD中满足该函数的元素放入新的RDD并返回

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val inputRDD = sc.textFile("examples/src/main/resources/data.txt")
    val mapRDD = inputRDD.filter(line => line.contains("li"))
    mapRDD.take(10).foreach(println)
    sc.stop()
           

6. sample

sample(withReplacement,fraction,seed):以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回, true为有放回的抽样,false为无放回的抽样

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.parallelize(1 to 10)
    //从RDD中随机且无放回的抽出50%的数据
    val value = rdd.sample(false, 0.5, 0)
    value.collect.foreach(x => print(x + " "))

    sc.stop()
           

7. union

将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.parallelize(1 to 3)
    val rdd2 = sc.parallelize(3 to 5)
    val unionRDD = rdd1.union(rdd2)
    unionRDD.collect.foreach(x => print(x + " "))
    sc.stop
           

8. intersection

返回两个RDD的交集

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.parallelize(1 to 3)
    val rdd2 = sc.parallelize(2 to 5)
    val rdd = rdd1.intersection(rdd2)
    rdd.foreach(println)
    sc.stop()
           

9. distinct

对RDD中的元素去重

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val list = List(1, 2, 3, 1, 3, 4, 5)
    val rdd = sc.parallelize(list)
    val rdd2 = rdd.distinct()
    rdd2.foreach(println)
    sc.stop()
           

10. cartesian

对两个RDD中的所有元素进行笛卡尔积操作

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val rdd1 = sc.parallelize(1 to 3)
    val rdd2 = sc.parallelize(4 to 5)
    val cartesianRDD = rdd1.cartesian(rdd2)
    cartesianRDD.foreach(x => println(x + " "))

    sc.stop()
           

11. pipe

通过shell命令(例如:Perl或bash脚本)将RDD的每个分区输送给管道。RDD元素被写入到进程的stdin中,stdout被作为字符串RDD返回。

12. coalesce

对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数目,但不会报错,只是分区个数还是原来的

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.parallelize(1 to 16, 4)
    val coalesceRDD = rdd.coalesce(3)
    val coalesceRDD2 = rdd.coalesce(5, false) //当suffle的值为false时,不能增加分区数(即分区数不能从4->5)
    val coalesceRDD3 = rdd.coalesce(5, true)
    println("重新分区后的分区个数:" + coalesceRDD.partitions.size)
    println("重新分区后的分区个数:" + coalesceRDD2.partitions.size)
    println("重新分区后的分区个数:" + coalesceRDD3.partitions.size)

    sc.stop()
           

13. repartition

调用的coalesce(numPartition,true)

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.parallelize(1 to 16, 4)
    val reRDD = rdd.repartition(5)
    println(reRDD.partitions.size)

    sc.stop()
           

14. repartitionAndSortWithinPartitions

根据给定的分区器重新划分RDD,并在每个分区中按其键对记录进行排序。这比调用repartition并在每个分区中进行排序更有效,因为它可以将排序推进到shuffle机器中。

15. glom

将RDD的每个分区中的类型为T的元素转换换数组Array[T]

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.parallelize(1 to 16,4)
    val glomRDD = rdd.glom() //RDD[Array[T]]
    glomRDD.foreach(rdd => {
      for (r <- rdd) {
        print(r + " ")
      }
      println()
    }
    )

    sc.stop()
           

16. randomSplit

根据weight权重值将一个RDD划分成多个RDD,权重越高划分得到的元素较多的几率就越大

val sparkConf = new SparkConf().setAppName("transformations examples").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.parallelize(1 to 10)
    val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0))
    randomSplitRDD(0).foreach(x => print(x +" "))
    println()
    randomSplitRDD(1).foreach(x => print(x +" "))
    println()
    randomSplitRDD(2).foreach(x => print(x +" "))
           

继续阅读