Spark RDD算子(四)
- combineByKey
-
- scala版本
- java版本
- reduceByKey
-
- scala版本
- java版本
- foldByKey
-
- scala版本
- java版本
- sortByKey
-
- scala版本
- java版本
combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
各個參數的含義:
- createCombiner:組合器函數,combineByKey()會周遊分區中的所有元素,每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。如果這是一個新的元素, combineByKey() 會使用一個叫作 createCombiner() 的函數來建立 那個鍵對應的累加器的初始值
- mergeValue:合并值函數,在同一個分區中如果該鍵已經存在,則會使用mergeValue将該鍵的目前值與新值合并
- mergeCombiners:合并組合器函數,當有多個分區時,可能會存在不同分區内有相同的鍵,此式會用mergeCombiners合并不同分區的相同鍵的值
- numPartitions:結果RDD分區數,預設保持原有的分區數
- partitioner:分區函數,預設為HashPartitioner
- mapSideCombine:是否需要在Map端進行combine操作,類似于MapReduce中的combine,預設為true
scala版本
以求學生各科分數的平均值為例
// 學生分數
val scores = List(
ScoreDetail("zhangsan", "Math", 69),
ScoreDetail("zhangsan", "English", 86),
ScoreDetail("lisi", "Math", 77),
ScoreDetail("lisi", "English", 99),
ScoreDetail("wangwu", "Math", 89),
ScoreDetail("wangwu", "English", 83),
ScoreDetail("zhaoliu", "Math", 67),
ScoreDetail("zhaoliu", "English", 97),
ScoreDetail("qianqi", "Math", 67),
ScoreDetail("qianqi", "English", 97),
ScoreDetail("wanger", "Math", 67),
ScoreDetail("wanger", "English", 97)
)
val rdd1 = sc.parallelize(scores)
// 将rdd1轉換成<k,v>形式,k為姓名,v為對應的ScoreDetail類型資料,如:<wanger,ScoreDetail("wanger", "Math", 67)>
val scoresDetailRDD = rdd1.map(x=>(x.sname,x))
// combineByKey,參數為三個函數,用三個匿名函數
val combineByKeyRDD = scoresDetailRDD.combineByKey(
// createCombiner,combineByKey周遊元素,當第一次遇見某個key,則取出其alue對應的ScoreDetail對象中的score,轉化成(score,1)的形式
(x: ScoreDetail) => (x.score, 1),// 1代表累計值的初始值,即代表初始有1門課程
// mergeValue,當combineByKey在目前分區周遊的元素之前已經遇到時,則将分數相加,累計值(課程數)加1
(x: (Int, Int), y: ScoreDetail) => (x._1 + y.score, x._2 + 1),
// mergeCombiner,用于合并不同分區,相同key的情況,将分數相加,累計值(課程數)相加
(x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
)
// combineByKeyRDD的形式,如:(zhangsan,(155,2)),求平均值隻需用155/2即可
val avg = combineByKeyRDD.map(x=>(x._1,x._2._1/x._2._2))
avg.collect.foreach(println)
java版本
List<ScoreDetailsJava> scoreDetails = new ArrayList<>();
scoreDetails.add(new ScoreDetailsJava("zhangsan","Math",69));
scoreDetails.add(new ScoreDetailsJava("zhangsan","English",86));
scoreDetails.add(new ScoreDetailsJava("lisi","Math",77));
scoreDetails.add(new ScoreDetailsJava("lisi","English",99));
scoreDetails.add(new ScoreDetailsJava("wangwu","Math",89));
scoreDetails.add(new ScoreDetailsJava("wangwu","English",83));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","Math",67));
scoreDetails.add(new ScoreDetailsJava("zhaoliu","English",97));
scoreDetails.add(new ScoreDetailsJava("qianqi","Math",67));
scoreDetails.add(new ScoreDetailsJava("qianqi","English",97));
scoreDetails.add(new ScoreDetailsJava("wanger","Math",67));
scoreDetails.add(new ScoreDetailsJava("wanger","English",97));
// 擷取rdd
JavaRDD<ScoreDetailsJava> rdd1 = sc.parallelize(scoreDetails);
// 将rdd轉換成鍵值對,<wanger,ScoreDetail("wanger", "Math", 67)>
JavaPairRDD<String, ScoreDetailsJava> scorePairRDD = rdd1.mapToPair(new PairFunction<ScoreDetailsJava, String, ScoreDetailsJava>() {
@Override
public Tuple2<String, ScoreDetailsJava> call(ScoreDetailsJava s) throws Exception {
return new Tuple2<>(s.sname, s);
}
});
// createCombiner
Function<ScoreDetailsJava, Tuple2<Integer, Integer>> createCombiner = new Function<ScoreDetailsJava, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(ScoreDetailsJava v1) throws Exception {
return new Tuple2<Integer, Integer>(v1.score, 1);
}
};
// mergeValue
Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, ScoreDetailsJava, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, ScoreDetailsJava v2) throws Exception {
return new Tuple2<>(v1._1 + v2.score, v1._2 + 1);
}
};
// mergeCombiner
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiner = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);
}
};
// combineByKey,需要上述三個函數做參數
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = scorePairRDD.combineByKey(createCombiner, mergeValue, mergeCombiner);
List<Tuple2<String, Tuple2<Integer, Integer>>> collect = combineByKeyRDD.collect();
// 求平均
for (Tuple2<String, Tuple2<Integer, Integer>> t : collect) {
System.out.println(t._1+" "+t._2._1/t._2._2);
}
reduceByKey
參數時一個函數,按照相同的key進行reduce操作,類似于scala的reduce的操作
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
scala版本
val rdd1 = sc.parallelize(List((1,2),(1,3),(5,6),(7,8),(7,9)))
val rdd2 = rdd1.reduceByKey((x,y)=>{println("one:"+x,"two"+y);x+y})
rdd2.collect.foreach(println)
java版本
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("hello world", "hello java", "hello scala"));
// 将rdd轉為鍵值對形式
PairFlatMapFunction<String, String, Integer> pairFlatMapFunction = new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<>();
String[] strings = s.split(" ");
for (String s1 : strings) {
Tuple2<String, Integer> t = new Tuple2<>(s1, 1);
list.add(t);
}
return list.iterator();
}
};
JavaPairRDD<String, Integer> rdd2 = rdd1.flatMapToPair(pairFlatMapFunction);
// reduceByKey需要傳入一個函數,該函數作用是key相同的元素值相加
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
};
JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2.reduceByKey(function2);
List<Tuple2<String, Integer>> collect = reduceByKeyRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect) {
System.out.println(tuple2);
}
foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
該算子通俗的解釋就是針對鍵值對的RDD進行聚合,它與reduceByKey的差別是foldByKey帶有初始值,zeroValue表示根據func函數,先用zeroValue的值對原始資料的value值進行初始化,初始化後,再根據key進行func操作
scala版本
val rdd4= sc.parallelize(List(("A",2),("A",3),("B",5),("B",8),("A",9)))
rdd4.foldByKey(10)((x,y)=>{println("one:"+x,"two:"+y);x+y}).collect.foreach(println)
java版本
List<Tuple2<String,Integer>> list = new ArrayList<>();
list.add(new Tuple2<String,Integer>("a",1));
list.add(new Tuple2<String,Integer>("a",2));
list.add(new Tuple2<String,Integer>("b",1));
list.add(new Tuple2<String,Integer>("c",1));
JavaRDD<Tuple2<String, Integer>> rdd3 = sc.parallelize(list);
// 将rdd3轉成pariRDD
JavaPairRDD<String, Integer> rdd4 = JavaPairRDD.fromJavaRDD(rdd3);
// foldByKey中傳入的函數
Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
};
// foldByKey,10為初始值
JavaPairRDD<String, Integer> foldByKeyRDD = rdd4.foldByKey(10, function2);
List<Tuple2<String, Integer>> collect2 = foldByKeyRDD.collect();
for (Tuple2<String, Integer> s : collect2) {
System.out.println(s);
}
sortByKey
def sortByKey(ascending: Boolean,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Int)]
SortByKey用于對pairRDD按照key進行排序,第一個參數可以設定true(正序)或者false(倒序),預設是true
scala版本
val rdd = sc.parallelize(List(("A",5),("A",3),("B",4),("C",8),("A",1),("B",6)))
// 倒序
rdd.sortByKey(false).collect()
// 正序
rdd.sortByKey(true).collect()
java版本
與scala版本的思想一緻,差別是函數功能要自己實作,直接看代碼
List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<>(5, "hello"));
list.add(new Tuple2<>(4, "world"));
list.add(new Tuple2<>(6, "scala"));
list.add(new Tuple2<>(3, "java"));
JavaRDD<Tuple2<Integer, String>> rdd1 = sc.parallelize(list);
PairFunction<Tuple2<Integer, String>, Integer, String> pairFunction = new PairFunction<Tuple2<Integer, String>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, String> tuple2) throws Exception {
return tuple2;
}
};
JavaPairRDD<Integer, String> sortByKeyRDD = rdd1.mapToPair(pairFunction).sortByKey(false);
List<Tuple2<Integer, String>> collect = sortByKeyRDD.collect();
for (Tuple2<Integer, String> tuple2 : collect) {
System.out.println(tuple2);
}