Spark RDD算子(四)
- combineByKey
-
- scala版本
- java版本
- reduceByKey
-
- scala版本
- java版本
- foldByKey
-
- scala版本
- java版本
- sortByKey
-
- scala版本
- java版本
combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
各个参数的含义:
- createCombiner:组合器函数,combineByKey()会遍历分区中的所有元素,每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 那个键对应的累加器的初始值
- mergeValue:合并值函数,在同一个分区中如果该键已经存在,则会使用mergeValue将该键的当前值与新值合并
- mergeCombiners:合并组合器函数,当有多个分区时,可能会存在不同分区内有相同的键,此式会用mergeCombiners合并不同分区的相同键的值
- numPartitions:结果RDD分区数,默认保持原有的分区数
- partitioner:分区函数,默认为HashPartitioner
- mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true
scala版本
以求学生各科分数的平均值为例
// 学生分数
val scores = List(
ScoreDetail("zhangsan", "Math", 69),
ScoreDetail("zhangsan", "English", 86),
ScoreDetail("lisi", "Math", 77),
ScoreDetail("lisi", "English", 99),
ScoreDetail("wangwu", "Math", 89),
ScoreDetail("wangwu", "English", 83),
ScoreDetail("zhaoliu", "Math", 67),
ScoreDetail("zhaoliu", "English", 97),
ScoreDetail("qianqi", "Math", 67),
ScoreDetail("qianqi", "English", 97),
ScoreDetail("wanger", "Math", 67),
ScoreDetail("wanger", "English", 97)
)
val rdd1 = sc.parallelize(scores)
// 将rdd1转换成<k,v>形式,k为姓名,v为对应的ScoreDetail类型数据,如:<wanger,ScoreDetail("wanger", "Math", 67)>
val scoresDetailRDD = rdd1.map(x=>(x.sname,x))
// combineByKey,参数为三个函数,用三个匿名函数
val combineByKeyRDD = scoresDetailRDD.combineByKey(
// createCombiner,combineByKey遍历元素,当第一次遇见某个key,则取出其alue对应的ScoreDetail对象中的score,转化成(score,1)的形式
(x: ScoreDetail) => (x.score, 1),// 1代表累计值的初始值,即代表初始有1门课程
// mergeValue,当combineByKey在当前分区遍历的元素之前已经遇到时,则将分数相加,累计值(课程数)加1
(x: (Int, Int), y: ScoreDetail) => (x._1 + y.score, x._2 + 1),
// mergeCombiner,用于合并不同分区,相同key的情况,将分数相加,累计值(课程数)相加
(x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
)
// combineByKeyRDD的形式,如:(zhangsan,(155,2)),求平均值只需用155/2即可
val avg = combineByKeyRDD.map(x=>(x._1,x._2._1/x._2._2))
avg.collect.foreach(println)
java版本
List<ScoreDetailsJava> scoreDetails = new ArrayList<>();
scoreDetails.add(new ScoreDetailsJava("zhangsan","Math",69));
scoreDetails.add(new ScoreDetailsJava("zhangsan","English",86));
scoreDetails.add(new ScoreDetailsJava("lisi","Math",77));
scoreDetails.add(new ScoreDetailsJava("lisi","English",99));
scoreDetails.add(new ScoreDetailsJava("wangwu","Math",89));
scoreDetails.add(new ScoreDetailsJava("wangwu","English",83));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","Math",67));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","English",97));
scoreDetails.add(new ScoreDetailsJava("qianqi","Math",67));
scoreDetails.add(new ScoreDetailsJava("qianqi","English",97));
scoreDetails.add(new ScoreDetailsJava("wanger","Math",67));
scoreDetails.add(new ScoreDetailsJava("wanger","English",97));
// 获取rdd
JavaRDD<ScoreDetailsJava> rdd1 = sc.parallelize(scoreDetails);
// 将rdd转换成键值对,<wanger,ScoreDetail("wanger", "Math", 67)>
JavaPairRDD<String, ScoreDetailsJava> scorePairRDD = rdd1.mapToPair(new PairFunction<ScoreDetailsJava, String, ScoreDetailsJava>() {
@Override
public Tuple2<String, ScoreDetailsJava> call(ScoreDetailsJava s) throws Exception {
return new Tuple2<>(s.sname, s);
}
});
// createCombiner
Function<ScoreDetailsJava, Tuple2<Integer, Integer>> createCombiner = new Function<ScoreDetailsJava, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(ScoreDetailsJava v1) throws Exception {
return new Tuple2<Integer, Integer>(v1.score, 1);
}
};
// mergeValue
Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, ScoreDetailsJava v2) throws Exception {
return new Tuple2<>(v1._1 + v2.score, v1._2 + 1);
}
};
// mergeCombiner
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiner = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);
}
};
// combineByKey,需要上述三个函数做参数
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = scorePairRDD.combineByKey(createCombiner, mergeValue, mergeCombiner);
List<Tuple2<String, Tuple2<Integer, Integer>>> collect = combineByKeyRDD.collect();
// 求平均
for (Tuple2<String, Tuple2<Integer, Integer>> t : collect) {
System.out.println(t._1+" "+t._2._1/t._2._2);
}
reduceByKey
参数时一个函数,按照相同的key进行reduce操作,类似于scala的reduce的操作
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
scala版本
val rdd1 = sc.parallelize(List((1,2),(1,3),(5,6),(7,8),(7,9)))
val rdd2 = rdd1.reduceByKey((x,y)=>{println("one:"+x,"two"+y);x+y})
rdd2.collect.foreach(println)
java版本
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("hello world", "hello java", "hello scala"));
// 将rdd转为键值对形式
PairFlatMapFunction<String, String, Integer> pairFlatMapFunction = new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<>();
String[] strings = s.split(" ");
for (String s1 : strings) {
Tuple2<String, Integer> t = new Tuple2<>(s1, 1);
list.add(t);
}
return list.iterator();
}
};
JavaPairRDD<String, Integer> rdd2 = rdd1.flatMapToPair(pairFlatMapFunction);
// reduceByKey需要传入一个函数,该函数作用是key相同的元素值相加
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
};
JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2.reduceByKey(function2);
List<Tuple2<String, Integer>> collect = reduceByKeyRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect) {
System.out.println(tuple2);
}
foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
该算子通俗的解释就是针对键值对的RDD进行聚合,它与reduceByKey的区别是foldByKey带有初始值,zeroValue表示根据func函数,先用zeroValue的值对原始数据的value值进行初始化,初始化后,再根据key进行func操作
scala版本
val rdd4= sc.parallelize(List(("A",2),("A",3),("B",5),("B",8),("A",9)))
rdd4.foldByKey(10)((x,y)=>{println("one:"+x,"two:"+y);x+y}).collect.foreach(println)
java版本
List<Tuple2<String,Integer>> list = new ArrayList<>();
list.add(new Tuple2<String,Integer>("a",1));
list.add(new Tuple2<String,Integer>("a",2));
list.add(new Tuple2<String,Integer>("b",1));
list.add(new Tuple2<String,Integer>("c",1));
JavaRDD<Tuple2<String, Integer>> rdd3 = sc.parallelize(list);
// 将rdd3转成pariRDD
JavaPairRDD<String, Integer> rdd4 = JavaPairRDD.fromJavaRDD(rdd3);
// foldByKey中传入的函数
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
};
// foldByKey,10为初始值
JavaPairRDD<String, Integer> foldByKeyRDD = rdd4.foldByKey(10, function2);
List<Tuple2<String, Integer>> collect2 = foldByKeyRDD.collect();
for (Tuple2<String, Integer> s : collect2) {
System.out.println(s);
}
sortByKey
def sortByKey(ascending: Boolean,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Int)]
SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true(正序)或者false(倒序),默认是true
scala版本
val rdd = sc.parallelize(List(("A",5),("A",3),("B",4),("C",8),("A",1),("B",6)))
// 倒序
rdd.sortByKey(false).collect()
// 正序
rdd.sortByKey(true).collect()
java版本
与scala版本的思想一致,区别是函数功能要自己实现,直接看代码
List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<>(5, "hello"));
list.add(new Tuple2<>(4, "world"));
list.add(new Tuple2<>(6, "scala"));
list.add(new Tuple2<>(3, "java"));
JavaRDD<Tuple2<Integer, String>> rdd1 = sc.parallelize(list);
PairFunction<Tuple2<Integer, String>, Integer, String> pairFunction = new PairFunction<Tuple2<Integer, String>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, String> tuple2) throws Exception {
return tuple2;
}
};
JavaPairRDD<Integer, String> sortByKeyRDD = rdd1.mapToPair(pairFunction).sortByKey(false);
List<Tuple2<Integer, String>> collect = sortByKeyRDD.collect();
for (Tuple2<Integer, String> tuple2 : collect) {
System.out.println(tuple2);
}