天天看点

Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

Spark RDD算子(四)

  • combineByKey
    • scala版本
    • java版本
  • reduceByKey
    • scala版本
    • java版本
  • foldByKey
    • scala版本
    • java版本
  • sortByKey
    • scala版本
    • java版本

combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

各个参数的含义:

  • createCombiner:组合器函数,combineByKey()会遍历分区中的所有元素,每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 那个键对应的累加器的初始值
  • mergeValue:合并值函数,在同一个分区中如果该键已经存在,则会使用mergeValue将该键的当前值与新值合并
  • mergeCombiners:合并组合器函数,当有多个分区时,可能会存在不同分区内有相同的键,此式会用mergeCombiners合并不同分区的相同键的值
  • numPartitions:结果RDD分区数,默认保持原有的分区数
  • partitioner:分区函数,默认为HashPartitioner
  • mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

scala版本

以求学生各科分数的平均值为例

// 学生分数
val scores = List(
      ScoreDetail("zhangsan", "Math", 69),
      ScoreDetail("zhangsan", "English", 86),
      ScoreDetail("lisi", "Math", 77),
      ScoreDetail("lisi", "English", 99),
      ScoreDetail("wangwu", "Math", 89),
      ScoreDetail("wangwu", "English", 83),
      ScoreDetail("zhaoliu", "Math", 67),
      ScoreDetail("zhaoliu", "English", 97),
      ScoreDetail("qianqi", "Math", 67),
      ScoreDetail("qianqi", "English", 97),
      ScoreDetail("wanger", "Math", 67),
      ScoreDetail("wanger", "English", 97)
    )

val rdd1 = sc.parallelize(scores)
// 将rdd1转换成<k,v>形式,k为姓名,v为对应的ScoreDetail类型数据,如:<wanger,ScoreDetail("wanger", "Math", 67)>
val scoresDetailRDD = rdd1.map(x=>(x.sname,x))
// combineByKey,参数为三个函数,用三个匿名函数
val combineByKeyRDD = scoresDetailRDD.combineByKey(
  // createCombiner,combineByKey遍历元素,当第一次遇见某个key,则取出其alue对应的ScoreDetail对象中的score,转化成(score,1)的形式
  (x: ScoreDetail) => (x.score, 1),// 1代表累计值的初始值,即代表初始有1门课程
  // mergeValue,当combineByKey在当前分区遍历的元素之前已经遇到时,则将分数相加,累计值(课程数)加1
  (x: (Int, Int), y: ScoreDetail) => (x._1 + y.score, x._2 + 1),
  // mergeCombiner,用于合并不同分区,相同key的情况,将分数相加,累计值(课程数)相加
  (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
)
// combineByKeyRDD的形式,如:(zhangsan,(155,2)),求平均值只需用155/2即可
val avg = combineByKeyRDD.map(x=>(x._1,x._2._1/x._2._2))
avg.collect.foreach(println)
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

List<ScoreDetailsJava> scoreDetails = new ArrayList<>();
scoreDetails.add(new ScoreDetailsJava("zhangsan","Math",69));
scoreDetails.add(new ScoreDetailsJava("zhangsan","English",86));
scoreDetails.add(new ScoreDetailsJava("lisi","Math",77));
scoreDetails.add(new ScoreDetailsJava("lisi","English",99));
scoreDetails.add(new ScoreDetailsJava("wangwu","Math",89));
scoreDetails.add(new ScoreDetailsJava("wangwu","English",83));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","Math",67));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","English",97));
scoreDetails.add(new ScoreDetailsJava("qianqi","Math",67));
scoreDetails.add(new ScoreDetailsJava("qianqi","English",97));
scoreDetails.add(new ScoreDetailsJava("wanger","Math",67));
scoreDetails.add(new ScoreDetailsJava("wanger","English",97));

// 获取rdd
JavaRDD<ScoreDetailsJava> rdd1 = sc.parallelize(scoreDetails);
// 将rdd转换成键值对,<wanger,ScoreDetail("wanger", "Math", 67)>
JavaPairRDD<String, ScoreDetailsJava> scorePairRDD = rdd1.mapToPair(new PairFunction<ScoreDetailsJava, String, ScoreDetailsJava>() {
    @Override
    public Tuple2<String, ScoreDetailsJava> call(ScoreDetailsJava s) throws Exception {
        return new Tuple2<>(s.sname, s);
    }
});
// createCombiner
Function<ScoreDetailsJava, Tuple2<Integer, Integer>> createCombiner = new Function<ScoreDetailsJava, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(ScoreDetailsJava v1) throws Exception {
        return new Tuple2<Integer, Integer>(v1.score, 1);
    }
};

// mergeValue
Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, ScoreDetailsJava v2) throws Exception {
        return new Tuple2<>(v1._1 + v2.score, v1._2 + 1);
    }
};

// mergeCombiner
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiner = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
        return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);
    }
};
// combineByKey,需要上述三个函数做参数
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = scorePairRDD.combineByKey(createCombiner, mergeValue, mergeCombiner);
List<Tuple2<String, Tuple2<Integer, Integer>>> collect = combineByKeyRDD.collect();
// 求平均
for (Tuple2<String, Tuple2<Integer, Integer>> t : collect) {
    System.out.println(t._1+" "+t._2._1/t._2._2);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

reduceByKey

参数时一个函数,按照相同的key进行reduce操作,类似于scala的reduce的操作

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

scala版本

val rdd1 = sc.parallelize(List((1,2),(1,3),(5,6),(7,8),(7,9)))
val rdd2 = rdd1.reduceByKey((x,y)=>{println("one:"+x,"two"+y);x+y})
rdd2.collect.foreach(println)
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("hello world", "hello java", "hello scala"));
// 将rdd转为键值对形式
PairFlatMapFunction<String, String, Integer> pairFlatMapFunction = new PairFlatMapFunction<String, String, Integer>() {
    @Override
    public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        String[] strings = s.split(" ");
        for (String s1 : strings) {
            Tuple2<String, Integer> t = new Tuple2<>(s1, 1);
            list.add(t);
        }
        return list.iterator();
    }
};
JavaPairRDD<String, Integer> rdd2 = rdd1.flatMapToPair(pairFlatMapFunction);
// reduceByKey需要传入一个函数,该函数作用是key相同的元素值相加
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
};
JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2.reduceByKey(function2);
List<Tuple2<String, Integer>> collect = reduceByKeyRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect) {
    System.out.println(tuple2);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

该算子通俗的解释就是针对键值对的RDD进行聚合,它与reduceByKey的区别是foldByKey带有初始值,zeroValue表示根据func函数,先用zeroValue的值对原始数据的value值进行初始化,初始化后,再根据key进行func操作

scala版本

val rdd4= sc.parallelize(List(("A",2),("A",3),("B",5),("B",8),("A",9)))
rdd4.foldByKey(10)((x,y)=>{println("one:"+x,"two:"+y);x+y}).collect.foreach(println)
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

List<Tuple2<String,Integer>> list = new ArrayList<>();
list.add(new Tuple2<String,Integer>("a",1));
list.add(new Tuple2<String,Integer>("a",2));
list.add(new Tuple2<String,Integer>("b",1));
list.add(new Tuple2<String,Integer>("c",1));
JavaRDD<Tuple2<String, Integer>> rdd3 = sc.parallelize(list);
// 将rdd3转成pariRDD
JavaPairRDD<String, Integer> rdd4 = JavaPairRDD.fromJavaRDD(rdd3);
// foldByKey中传入的函数
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
};
// foldByKey,10为初始值
JavaPairRDD<String, Integer> foldByKeyRDD = rdd4.foldByKey(10, function2);
List<Tuple2<String, Integer>> collect2 = foldByKeyRDD.collect();
for (Tuple2<String, Integer> s : collect2) {
    System.out.println(s);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

sortByKey

def sortByKey(ascending: Boolean,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Int)]

SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true(正序)或者false(倒序),默认是true

scala版本

val rdd = sc.parallelize(List(("A",5),("A",3),("B",4),("C",8),("A",1),("B",6)))
// 倒序
rdd.sortByKey(false).collect()
// 正序
rdd.sortByKey(true).collect()
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

与scala版本的思想一致,区别是函数功能要自己实现,直接看代码

List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<>(5, "hello"));
list.add(new Tuple2<>(4, "world"));
list.add(new Tuple2<>(6, "scala"));
list.add(new Tuple2<>(3, "java"));
JavaRDD<Tuple2<Integer, String>> rdd1 = sc.parallelize(list);
PairFunction<Tuple2<Integer, String>, Integer, String> pairFunction = new PairFunction<Tuple2<Integer, String>, Integer, String>() {
    @Override
    public Tuple2<Integer, String> call(Tuple2<Integer, String> tuple2) throws Exception {
        return tuple2;
    }
};
JavaPairRDD<Integer, String> sortByKeyRDD = rdd1.mapToPair(pairFunction).sortByKey(false);
List<Tuple2<Integer, String>> collect = sortByKeyRDD.collect();
for (Tuple2<Integer, String> tuple2 : collect) {
    System.out.println(tuple2);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

继续阅读