天天看点

SparkCore算子(实例)之---- 交集、差集、并集(intersection, subtract, union, distinct, subtractByKey)

1. 交集 intersecion

1.1 源码

/**
   * Return the intersection of this RDD and another one. The output will not contain any duplicate
   * elements, even if the input RDDs did.//交集结果将会去重
   * 
   * @note This method performs a shuffle internally.//属于shuffle类算子
   */
   //参与计算的两个RDD的元素泛型必须一致,也是返回的RDD的元素泛型
  def intersection(other: RDD[T]): RDD[T] = withScope {
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
  }
           

源码分析:

  1. thisRDD.intersection(otherRDD):计算 thisRDD 和 otherRDD 的交集,交集结果将不会包含重复的元素,即使有的元素在两个 RDD 中都出现多次;
  2. intersection 属于 shuffleDependency 类算子;
  3. 其内部调用了cogroup算子;
  4. Note:凡是涉及两个RDD的计算,并且计算是以相同 key分组的数据为对象进行的,那么一定会调用 cogroup(otherDataSet,[numTasks]) 算子。

1.2 代码实例:

val list1 = List(1,2,3,4,5,6,7,7,20)
   val list2 = List(4,5,6,7,8,9,10)
   val rdd1: RDD[Int] = sc.parallelize(list1 , 3) //3为分区数,默认分区数为2
   val rdd2: RDD[Int] = sc.parallelize(list2)
   //交集:rdd1交rdd2
   rdd1.intersection(rdd2).foreach(println)
           

运行结果如下:

6
4
7
5
           

2. 差集 subtract

2.1 源码

/**//默认保持thisRDD的分区器 和 分区数量
   * Return an RDD with the elements from `this` that are not in `other`.
   * 
   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
   * RDD will be <= us.
   */
  def subtract(other: RDD[T]): RDD[T] = withScope {
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
  }

  /**//可以传入参数,控制新生成RDD的分区数量(仍保持thisRDD分区规则)
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
    subtract(other, new HashPartitioner(numPartitions))
  }

  /**//可以传入参数,控制使用自定义的分区器
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(
      other: RDD[T],
      p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    if (partitioner == Some(p)) {
      // Our partitioner knows how to handle T (which, since we have a partitioner, is
      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
      val p2 = new Partitioner() {
        override def numPartitions: Int = p.numPartitions
        override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
      }
      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
      // anyway, and when calling .keys, will not have a partitioner set, even though
      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
      // partitioned by the right/real keys (e.g. p).
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
  }

    
           

2.2 代码实例

2.2.1 参与运算的RDD的泛型必须完全一致(统一类型)

//准备数据集
    val list1 = List(1,2,3,4,5,6,7,7,20)
    val list2 = List(4,5,6,7,8,9,10)
    val array = Array("hello huangbo","hello xuzheng","hello huangxiaoming")
    val kv = Array(("a",1), ("b",2),("c",3),("a",1),("b",1),("c",1))
    val rdd1: RDD[Int] = sc.parallelize(list1 , 3) 
    val rdd2: RDD[Int] = sc.parallelize(list2)
    val rdd3: RDD[String] = sc.makeRDD(array)
    //k-v型的PairRDD
    val rdd4:RDD[(String,Int)] = sc.makeRDD(kv)  //会自动将元组的第一个元素作为key
    
    /** 开始计算差集
      * subtract():差集,参与运算的RDD必须具有相同泛型(元素类型一致);
      *     1、当为单值元素时,直接求差集
      *     2、当为(K,V)时,仍然按照整个元素进行求差集(而不是按照key);
      */
    val subtractRes: RDD[Int] = rdd1.subtract(rdd2)
    subtractRes.foreach(x => print(x + "\t"));println() //差集: 3	1	2	20
    //rdd3.subtract(rdd4)  //错误,参与运算的RDD必须泛型相同
           

2.2.2 当RDD的元素为元组时,元组内部的构成元素也必须一致:

//错误:泛型不统一,无法进行差集计算(上虽然都是元组,但是元组的泛型不一致)
    val list01 = Array(("a",1), ("b",2), ("c",3))
    val rdd01: RDD[(String, Int)] = sc.parallelize(list01)
    val list02 = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
    val rdd02: RDD[(String, Any)] = sc.makeRDD(list02)
    //rdd01.subtract(rdd02).foreach(print) //错误,元组的泛型不一致
           

但是可以使用多态,向上进行类型抽象,将类型统一:

//正确:泛型统一了,结果为:(a,1)(b,2)
    //手动指定泛型Any,以统一类型
    val list03: Array[(String, Any)] = Array(("a",1), ("b",2), ("c",3))
    val rdd03 = sc.parallelize(list03)
    val list04: Array[(String, Any)] = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
    val rdd04 = sc.makeRDD(list04)
    rdd03.subtract(rdd04).foreach(print)
           

3. 按照key取差集 subtractByKey

thisPairRDD.subtractByKey(otherPairRDD):以key值作为元素的唯一性标志,记性差集运算,与value的类型和值无关。

注意:参与运算的必须是PairRDD。

代码实例

/**
      * subtractByKey(otherRDD):只针对于key做差集,返回主RDD中存在的KEY,而otherRDD中不存在的KEY的元素;
      *           ----针对于PairRDD
      */
    val rdd10 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
    val rdd11 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3)))
    //结果为 (d,5): 因为只有key="d" 在rdd11中没有出现
    rdd10.subtractByKey(rdd11).foreach(print)
           

4. 并集

4.1 拼接算子 union

/** 交集、并集、差集
   * union(): 直接拼接,并不会去重(并不是数学意义上的并集)
   * count():统计 RDD的元素个数!
   */
/*
    rdd1 = {1,2,3,4,5,6,7,7,20}
    rdd2 = {4,5,6,7,8,9,10}
 */
    println(rdd1.union(rdd2).count())//16个元素
           

4.2 求交集(先union,再distinct)

//并集:先union拼接,再distinct去重
    rdd1.union(rdd2).distinct().foreach(println)
           

继续阅读