1. 交集 intersecion
1.1 源码
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.//交集结果将会去重
*
* @note This method performs a shuffle internally.//属于shuffle类算子
*/
//参与计算的两个RDD的元素泛型必须一致,也是返回的RDD的元素泛型
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
源码分析:
- thisRDD.intersection(otherRDD):计算 thisRDD 和 otherRDD 的交集,交集结果将不会包含重复的元素,即使有的元素在两个 RDD 中都出现多次;
- intersection 属于 shuffleDependency 类算子;
- 其内部调用了cogroup算子;
- Note:凡是涉及两个RDD的计算,并且计算是以相同 key分组的数据为对象进行的,那么一定会调用 cogroup(otherDataSet,[numTasks]) 算子。
1.2 代码实例:
val list1 = List(1,2,3,4,5,6,7,7,20)
val list2 = List(4,5,6,7,8,9,10)
val rdd1: RDD[Int] = sc.parallelize(list1 , 3) //3为分区数,默认分区数为2
val rdd2: RDD[Int] = sc.parallelize(list2)
//交集:rdd1交rdd2
rdd1.intersection(rdd2).foreach(println)
运行结果如下:
6
4
7
5
2. 差集 subtract
2.1 源码
/**//默认保持thisRDD的分区器 和 分区数量
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
/**//可以传入参数,控制新生成RDD的分区数量(仍保持thisRDD分区规则)
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
subtract(other, new HashPartitioner(numPartitions))
}
/**//可以传入参数,控制使用自定义的分区器
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
val p2 = new Partitioner() {
override def numPartitions: Int = p.numPartitions
override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
// anyway, and when calling .keys, will not have a partitioner set, even though
// the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
// partitioned by the right/real keys (e.g. p).
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
2.2 代码实例
2.2.1 参与运算的RDD的泛型必须完全一致(统一类型)
//准备数据集
val list1 = List(1,2,3,4,5,6,7,7,20)
val list2 = List(4,5,6,7,8,9,10)
val array = Array("hello huangbo","hello xuzheng","hello huangxiaoming")
val kv = Array(("a",1), ("b",2),("c",3),("a",1),("b",1),("c",1))
val rdd1: RDD[Int] = sc.parallelize(list1 , 3)
val rdd2: RDD[Int] = sc.parallelize(list2)
val rdd3: RDD[String] = sc.makeRDD(array)
//k-v型的PairRDD
val rdd4:RDD[(String,Int)] = sc.makeRDD(kv) //会自动将元组的第一个元素作为key
/** 开始计算差集
* subtract():差集,参与运算的RDD必须具有相同泛型(元素类型一致);
* 1、当为单值元素时,直接求差集
* 2、当为(K,V)时,仍然按照整个元素进行求差集(而不是按照key);
*/
val subtractRes: RDD[Int] = rdd1.subtract(rdd2)
subtractRes.foreach(x => print(x + "\t"));println() //差集: 3 1 2 20
//rdd3.subtract(rdd4) //错误,参与运算的RDD必须泛型相同
2.2.2 当RDD的元素为元组时,元组内部的构成元素也必须一致:
//错误:泛型不统一,无法进行差集计算(上虽然都是元组,但是元组的泛型不一致)
val list01 = Array(("a",1), ("b",2), ("c",3))
val rdd01: RDD[(String, Int)] = sc.parallelize(list01)
val list02 = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
val rdd02: RDD[(String, Any)] = sc.makeRDD(list02)
//rdd01.subtract(rdd02).foreach(print) //错误,元组的泛型不一致
但是可以使用多态,向上进行类型抽象,将类型统一:
//正确:泛型统一了,结果为:(a,1)(b,2)
//手动指定泛型Any,以统一类型
val list03: Array[(String, Any)] = Array(("a",1), ("b",2), ("c",3))
val rdd03 = sc.parallelize(list03)
val list04: Array[(String, Any)] = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
val rdd04 = sc.makeRDD(list04)
rdd03.subtract(rdd04).foreach(print)
3. 按照key取差集 subtractByKey
thisPairRDD.subtractByKey(otherPairRDD):以key值作为元素的唯一性标志,记性差集运算,与value的类型和值无关。
注意:参与运算的必须是PairRDD。
代码实例
/**
* subtractByKey(otherRDD):只针对于key做差集,返回主RDD中存在的KEY,而otherRDD中不存在的KEY的元素;
* ----针对于PairRDD
*/
val rdd10 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
val rdd11 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3)))
//结果为 (d,5): 因为只有key="d" 在rdd11中没有出现
rdd10.subtractByKey(rdd11).foreach(print)
4. 并集
4.1 拼接算子 union
/** 交集、并集、差集
* union(): 直接拼接,并不会去重(并不是数学意义上的并集)
* count():统计 RDD的元素个数!
*/
/*
rdd1 = {1,2,3,4,5,6,7,7,20}
rdd2 = {4,5,6,7,8,9,10}
*/
println(rdd1.union(rdd2).count())//16个元素
4.2 求交集(先union,再distinct)
//并集:先union拼接,再distinct去重
rdd1.union(rdd2).distinct().foreach(println)