天天看點

Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

Spark RDD算子(四)

  • combineByKey
    • scala版本
    • java版本
  • reduceByKey
    • scala版本
    • java版本
  • foldByKey
    • scala版本
    • java版本
  • sortByKey
    • scala版本
    • java版本

combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

各個參數的含義:

  • createCombiner:組合器函數,combineByKey()會周遊分區中的所有元素,每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。如果這是一個新的元素, combineByKey() 會使用一個叫作 createCombiner() 的函數來建立 那個鍵對應的累加器的初始值
  • mergeValue:合并值函數,在同一個分區中如果該鍵已經存在,則會使用mergeValue将該鍵的目前值與新值合并
  • mergeCombiners:合并組合器函數,當有多個分區時,可能會存在不同分區内有相同的鍵,此式會用mergeCombiners合并不同分區的相同鍵的值
  • numPartitions:結果RDD分區數,預設保持原有的分區數
  • partitioner:分區函數,預設為HashPartitioner
  • mapSideCombine:是否需要在Map端進行combine操作,類似于MapReduce中的combine,預設為true

scala版本

以求學生各科分數的平均值為例

// 學生分數
val scores = List(
      ScoreDetail("zhangsan", "Math", 69),
      ScoreDetail("zhangsan", "English", 86),
      ScoreDetail("lisi", "Math", 77),
      ScoreDetail("lisi", "English", 99),
      ScoreDetail("wangwu", "Math", 89),
      ScoreDetail("wangwu", "English", 83),
      ScoreDetail("zhaoliu", "Math", 67),
      ScoreDetail("zhaoliu", "English", 97),
      ScoreDetail("qianqi", "Math", 67),
      ScoreDetail("qianqi", "English", 97),
      ScoreDetail("wanger", "Math", 67),
      ScoreDetail("wanger", "English", 97)
    )

val rdd1 = sc.parallelize(scores)
// 将rdd1轉換成<k,v>形式,k為姓名,v為對應的ScoreDetail類型資料,如:<wanger,ScoreDetail("wanger", "Math", 67)>
val scoresDetailRDD = rdd1.map(x=>(x.sname,x))
// combineByKey,參數為三個函數,用三個匿名函數
val combineByKeyRDD = scoresDetailRDD.combineByKey(
  // createCombiner,combineByKey周遊元素,當第一次遇見某個key,則取出其alue對應的ScoreDetail對象中的score,轉化成(score,1)的形式
  (x: ScoreDetail) => (x.score, 1),// 1代表累計值的初始值,即代表初始有1門課程
  // mergeValue,當combineByKey在目前分區周遊的元素之前已經遇到時,則将分數相加,累計值(課程數)加1
  (x: (Int, Int), y: ScoreDetail) => (x._1 + y.score, x._2 + 1),
  // mergeCombiner,用于合并不同分區,相同key的情況,将分數相加,累計值(課程數)相加
  (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
)
// combineByKeyRDD的形式,如:(zhangsan,(155,2)),求平均值隻需用155/2即可
val avg = combineByKeyRDD.map(x=>(x._1,x._2._1/x._2._2))
avg.collect.foreach(println)
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

List<ScoreDetailsJava> scoreDetails = new ArrayList<>();
scoreDetails.add(new ScoreDetailsJava("zhangsan","Math",69));
scoreDetails.add(new ScoreDetailsJava("zhangsan","English",86));
scoreDetails.add(new ScoreDetailsJava("lisi","Math",77));
scoreDetails.add(new ScoreDetailsJava("lisi","English",99));
scoreDetails.add(new ScoreDetailsJava("wangwu","Math",89));
scoreDetails.add(new ScoreDetailsJava("wangwu","English",83));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","Math",67));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","English",97));
scoreDetails.add(new ScoreDetailsJava("qianqi","Math",67));
scoreDetails.add(new ScoreDetailsJava("qianqi","English",97));
scoreDetails.add(new ScoreDetailsJava("wanger","Math",67));
scoreDetails.add(new ScoreDetailsJava("wanger","English",97));

// 擷取rdd
JavaRDD<ScoreDetailsJava> rdd1 = sc.parallelize(scoreDetails);
// 将rdd轉換成鍵值對,<wanger,ScoreDetail("wanger", "Math", 67)>
JavaPairRDD<String, ScoreDetailsJava> scorePairRDD = rdd1.mapToPair(new PairFunction<ScoreDetailsJava, String, ScoreDetailsJava>() {
    @Override
    public Tuple2<String, ScoreDetailsJava> call(ScoreDetailsJava s) throws Exception {
        return new Tuple2<>(s.sname, s);
    }
});
// createCombiner
Function<ScoreDetailsJava, Tuple2<Integer, Integer>> createCombiner = new Function<ScoreDetailsJava, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(ScoreDetailsJava v1) throws Exception {
        return new Tuple2<Integer, Integer>(v1.score, 1);
    }
};

// mergeValue
Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, ScoreDetailsJava v2) throws Exception {
        return new Tuple2<>(v1._1 + v2.score, v1._2 + 1);
    }
};

// mergeCombiner
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiner = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
        return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);
    }
};
// combineByKey,需要上述三個函數做參數
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = scorePairRDD.combineByKey(createCombiner, mergeValue, mergeCombiner);
List<Tuple2<String, Tuple2<Integer, Integer>>> collect = combineByKeyRDD.collect();
// 求平均
for (Tuple2<String, Tuple2<Integer, Integer>> t : collect) {
    System.out.println(t._1+" "+t._2._1/t._2._2);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

reduceByKey

參數時一個函數,按照相同的key進行reduce操作,類似于scala的reduce的操作

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

scala版本

val rdd1 = sc.parallelize(List((1,2),(1,3),(5,6),(7,8),(7,9)))
val rdd2 = rdd1.reduceByKey((x,y)=>{println("one:"+x,"two"+y);x+y})
rdd2.collect.foreach(println)
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("hello world", "hello java", "hello scala"));
// 将rdd轉為鍵值對形式
PairFlatMapFunction<String, String, Integer> pairFlatMapFunction = new PairFlatMapFunction<String, String, Integer>() {
    @Override
    public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        String[] strings = s.split(" ");
        for (String s1 : strings) {
            Tuple2<String, Integer> t = new Tuple2<>(s1, 1);
            list.add(t);
        }
        return list.iterator();
    }
};
JavaPairRDD<String, Integer> rdd2 = rdd1.flatMapToPair(pairFlatMapFunction);
// reduceByKey需要傳入一個函數,該函數作用是key相同的元素值相加
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
};
JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2.reduceByKey(function2);
List<Tuple2<String, Integer>> collect = reduceByKeyRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect) {
    System.out.println(tuple2);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

該算子通俗的解釋就是針對鍵值對的RDD進行聚合,它與reduceByKey的差別是foldByKey帶有初始值,zeroValue表示根據func函數,先用zeroValue的值對原始資料的value值進行初始化,初始化後,再根據key進行func操作

scala版本

val rdd4= sc.parallelize(List(("A",2),("A",3),("B",5),("B",8),("A",9)))
rdd4.foldByKey(10)((x,y)=>{println("one:"+x,"two:"+y);x+y}).collect.foreach(println)
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

List<Tuple2<String,Integer>> list = new ArrayList<>();
list.add(new Tuple2<String,Integer>("a",1));
list.add(new Tuple2<String,Integer>("a",2));
list.add(new Tuple2<String,Integer>("b",1));
list.add(new Tuple2<String,Integer>("c",1));
JavaRDD<Tuple2<String, Integer>> rdd3 = sc.parallelize(list);
// 将rdd3轉成pariRDD
JavaPairRDD<String, Integer> rdd4 = JavaPairRDD.fromJavaRDD(rdd3);
// foldByKey中傳入的函數
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
};
// foldByKey,10為初始值
JavaPairRDD<String, Integer> foldByKeyRDD = rdd4.foldByKey(10, function2);
List<Tuple2<String, Integer>> collect2 = foldByKeyRDD.collect();
for (Tuple2<String, Integer> s : collect2) {
    System.out.println(s);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

sortByKey

def sortByKey(ascending: Boolean,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Int)]

SortByKey用于對pairRDD按照key進行排序,第一個參數可以設定true(正序)或者false(倒序),預設是true

scala版本

val rdd = sc.parallelize(List(("A",5),("A",3),("B",4),("C",8),("A",1),("B",6)))
// 倒序
rdd.sortByKey(false).collect()
// 正序
rdd.sortByKey(true).collect()
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

java版本

與scala版本的思想一緻,差別是函數功能要自己實作,直接看代碼

List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<>(5, "hello"));
list.add(new Tuple2<>(4, "world"));
list.add(new Tuple2<>(6, "scala"));
list.add(new Tuple2<>(3, "java"));
JavaRDD<Tuple2<Integer, String>> rdd1 = sc.parallelize(list);
PairFunction<Tuple2<Integer, String>, Integer, String> pairFunction = new PairFunction<Tuple2<Integer, String>, Integer, String>() {
    @Override
    public Tuple2<Integer, String> call(Tuple2<Integer, String> tuple2) throws Exception {
        return tuple2;
    }
};
JavaPairRDD<Integer, String> sortByKeyRDD = rdd1.mapToPair(pairFunction).sortByKey(false);
List<Tuple2<Integer, String>> collect = sortByKeyRDD.collect();
for (Tuple2<Integer, String> tuple2 : collect) {
    System.out.println(tuple2);
}
           
Spark RDD算子(四)combineByKey,reduceByKey,foldByKey,sortByKeycombineByKeyreduceByKeyfoldByKeysortByKey

繼續閱讀