Java代碼:
package com.netcloud.spark.sparkcore.transformation;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* 轉換算子實戰
* map;将集合中的每個元素都乘以2
* filter:過濾出集合中的所有的偶數
* flatMap:将文本行拆分多個單詞
* groupByKey:将班級的成績分組
* reduceByKey:統計每個班級的總分數
* sortByKey:将學生分數進行排序 預設是true 升序排序
* join:列印每個學生的成績
* cogroup:列印每個學生的成績
*
* @author yangshaojun
* #date 2019/3/9 17:10
* @version 1.0
*/
public class TransFormationPractice {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("wordCount").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
//map(sc);
//filter(sc);
//groupByKey(sc);
//reduceByKey(sc);
//sortByKey(sc);
//join(sc);
cogroup(sc);
sc.stop();
}
/**
* Map算子的使用
*
* @param sc
*/
private static void map(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
/**
* 使用map算子;将集合中的每個元素都乘以2
* map算子,是對任何類型的RDD,都可以調用
* 在java中map算子接收的參數是Function對象
* 建立的Function對象,一定會讓你設定第二個泛型參數,這個泛型類型,就是傳回新元素的類型
* 同時call()方法的傳回類型,也必須與第二個泛型類型同步。
* 在call方法内部,就可以對原始RDD中的每個元素進行各種處理和計算,并傳回一個新的元素。
* 所有新的元素就會組成一個新的RDD。
*/
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
/**
* 傳入call方法的就是 1,2,3,4,5
* 傳回的就是 2,4,6,8,10
*
* @param v1
* @return
* @throws Exception
*/
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
//列印新的RDD
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
/**
* filter:過濾出集合中的所有的偶數
*/
private static void filter(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
JavaRDD<Integer> evennumRDD = numbersRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
evennumRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
/**
* flatMap:将文本行拆分多個單詞
*/
private static void flatMap(JavaSparkContext sc) {
List<String> numbers = Arrays.asList("hello you", "hello me", "hello world");
JavaRDD<String> numbersRDD = sc.parallelize(numbers);
/**
* flatMap算子 在java中接收的參數是 FlatMapFunction
* 我們需要自己定義FlatMapFunction的第二個參數泛型,即:代表了傳回的新元素的類型;
* call() 方法傳回的類型是 不是U 而是 Iterable<<U> 這裡的U也與第二個泛型類型相同。
*
*/
// JavaRDD<String> evennumRDD = numbersRDD.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public Iterable<String> call(String s) throws Exception {
// return Arrays.asList(s.split(" "));
// }
// });
// evennumRDD.foreach(new VoidFunction<String>() {
// @Override
// public void call(String s) throws Exception {
// System.out.println(s);
// }
// });
}
/**
* groupByKey:按照班級将成績分組
* 使用groupByKey後傳回的算子類型依然是JavaPairRDD 但是第二個參數泛型是Iterable<Integer>
*/
private static void groupByKey(JavaSparkContext sc) {
List<Tuple2<String, Integer>> scoresList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 87),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class1", 65));
//并行化集合,建立JavaPairRDD 每個元素是tuple2
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoresList);
JavaPairRDD<String, Iterable<Integer>> groupedscore = scores.groupByKey();
groupedscore.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> t1) throws Exception {
Iterator<Integer> ite = t1._2.iterator();
while (ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("=============================");
}
}
);
}
/**
* reduceByKey:統計每個班級的總分數
*
* @param sc
*/
public static void reduceByKey(JavaSparkContext sc) {
List<Tuple2<String, Integer>> scoresList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 87),
new Tuple2<String, Integer>("class2", 90),
new Tuple2<String, Integer>("class1", 65));
//并行化集合,建立JavaPairRDD 每個元素是tuple2
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoresList);
/**
* reduceByKey接收的參數是Function2類型,它有3個泛型參數,前兩個泛型參數類型代表原始的RDD value的類型
* 是以 ,對每個key 進行reduce 都會依次将第一個、第二個value傳入 然後将得到的值載與第三個value傳入
* 是以此處, 會自動的定義兩個泛型類型 代表call()方法的兩個傳入的參數的類型
* 第三個泛型類型,代表了每次reduce操作傳回的值類型,預設也是與原始的RDD的value類型相同的。
* reduceByKey算的傳回的RDD 還是JavaPairRDD<Key,value>
*/
JavaPairRDD<String, Integer> reduceRDD = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
reduceRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> t1) throws Exception {
System.out.println("class:" + t1._1 + " " + "totalscore:" + t1._2);
}
});
}
/**
* sortByKey:根據分數進行排序
*
* @param sc
*/
public static void sortByKey(JavaSparkContext sc) {
List<Tuple2<Integer, String>> scoresList =
Arrays.asList(
new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "leo1"),
new Tuple2<Integer, String>(100, "leo2")
);
JavaPairRDD<Integer, String> scoreRDD = sc.parallelizePairs(scoresList, 1);
/**
* Java中的sortByKey方法中接收的參數是 boolean 預設是true 升序排序
*/
JavaPairRDD<Integer, String> retRDD = scoreRDD.sortByKey();
retRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {
@Override
public void call(Tuple2<Integer, String> t1) throws Exception {
System.out.println(t1._1 + ":" + t1._2);
}
});
}
/**
* join:列印每個學生的成績
*
* @param sc
*/
public static void join(JavaSparkContext sc) {
List<Tuple2<Integer, String>> students = Arrays.asList(
new Tuple2<>(1, "tom"),
new Tuple2<>(2, "jack"),
new Tuple2<>(3, "marry"),
new Tuple2<>(4, "ff"));
List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new Tuple2<>(1, 80),
new Tuple2<>(2, 90),
new Tuple2<>(3, 100),
new Tuple2<>(4, 60));
JavaPairRDD<Integer, String> studentRDD = sc.parallelizePairs(students);
JavaPairRDD<Integer, Integer> scoreRDD = sc.parallelizePairs(scores);
JavaPairRDD<Integer, Tuple2<String, Integer>> retRDD = studentRDD.join(scoreRDD);
/**
*使用 join算子 關聯兩個RDD 是等值連接配接
* join以後 是根據key進行join的,并且傳回JavapairRDD
* JavaPairRDD的第一個泛型是之前兩個RDD的key類型 第二個泛型類型是Tuple2<v1,v2>類型
* Tuple2的兩個泛型分别是原始RDD的value類型
* 比如 (1,1) (1,2) (1,3)的RDD
* 還有一個 (1,4) (2,1) (2,2)
* join以後 實際上得到 (1,(1,4)) (1(2,4)) (1,,3,4))
*/
retRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t1) throws Exception {
System.out.println(t1);
}
});
}
/**
* cogroup:列印每個學生的成績
*
* @param sc
*/
public static void cogroup(JavaSparkContext sc) {
List<Tuple2<Integer, String>> students = Arrays.asList(
new Tuple2<>(1, "tom"),
new Tuple2<>(2, "jack"),
new Tuple2<>(3, "marry"),
new Tuple2<>(4, "ff"));
List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new Tuple2<>(1, 80),
new Tuple2<>(2, 90),
new Tuple2<>(3, 100),
new Tuple2<>(1, 60),
new Tuple2<>(2, 80));
JavaPairRDD<Integer, String> studentRDD = sc.parallelizePairs(students);
JavaPairRDD<Integer, Integer> scoreRDD = sc.parallelizePairs(scores);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> retRDD = studentRDD.cogroup(scoreRDD);
/**
*cogroup和join不同
* 相當于 一個 key join上所有的value 都放到一個Iterable裡面去了
* cogroup不太好講解 須有動手編寫案例仔細體會。
*/
retRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t1) throws Exception {
System.out.println(t1);
}
});
/**
* (1,([tom],[80, 60]))
*/
}
}
Scala代碼
package com.netcloud.bigdata.spark_core.basiclearning.transform
import org.apache.spark.{SparkConf, SparkContext}
/**
* 轉換算子實戰
* map:将集合中的所有元素乘以2
* filter:過濾出集合中的所有的偶數
* flatMap:将文本行拆分多個單詞
* groupByKey:将班級的成績分組
* reduceByKey:統計每個班級的總分數
* sortByKey:對班級的分數進行排序 預設是升序排序
* join:列印每個學生的成績
* cogroup:列印每個學生的成績
*
* @author yangshaojun
* #date 2019/3/9 17:08
* @version 1.0
*/
object Transform_0020_TransformPractice {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TransFormationPractice").setMaster("local[2]")
val sc = new SparkContext(conf)
// map(sc)
// filter(sc)
// flatMap(sc)
// groupByKey(sc)
// reduceByKey(sc)
// sortByKey(sc)
// join(sc)
corgroup(sc)
sc.stop()
}
/**
* map:将集合中的所有元素乘以2
*
* @param sc
*/
def map(sc: SparkContext): Unit = {
val numlist = List(1, 2, 3, 4, 5);
val numberRDD = sc.parallelize(numlist)
/**
* 使用map算子;将集合中的每個元素都乘以2
*/
val multipleNumberRDD = numberRDD.map(_ * 2)
multipleNumberRDD.foreach(println)
}
/**
* filter:過濾出集合中的所有的偶數
*
* @param sc
*/
def filter(sc: SparkContext): Unit = {
val numlist = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
val numberRDD = sc.parallelize(numlist)
/**
* 使用map算子;将集合中的每個元素都乘以2
*/
val multipleNumberRDD = numberRDD.filter(_ % 2 == 0)
multipleNumberRDD.foreach(println)
}
/**
* flatMap:将文本行拆分多個單詞
*/
def flatMap(sc: SparkContext): Unit = {
val numlist = List("hello you", "hello me", "hello world");
val numberRDD = sc.parallelize(numlist)
/**
* 使用map算子;将集合中的每個元素都乘以2
*/
val multipleNumberRDD = numberRDD.flatMap(_.split(" "))
multipleNumberRDD.foreach(println)
}
/**
* groupByKey:按照班級對成績分組
*/
def groupByKey(sc: SparkContext): Unit = {
val numlist = List(Tuple2("class1", 90), Tuple2("class1", 92), Tuple2("class2", 50), Tuple2("class2", 60));
val numberRDD = sc.parallelize(numlist)
/**
* 使用map算子;将集合中的每個元素都乘以2
*/
val multipleNumberRDD = numberRDD.groupByKey()
multipleNumberRDD.foreach(println)
}
/**
* reduceByKey:統計每個班級的總分數
*
* @param sc
*/
def reduceByKey(sc: SparkContext): Unit = {
val numlist = List(Tuple2("class1", 90), Tuple2("class1", 92), Tuple2("class2", 50), Tuple2("class2", 60));
val numberRDD = sc.parallelize(numlist)
val retRDD = numberRDD.reduceByKey(_ + _)
retRDD.foreach(println)
}
/**
* sortByKey:對班級的分數進行排序
*/
def sortByKey(sc: SparkContext): Unit = {
val numlist = Array(Tuple2(65, "Tom"), Tuple2(50, "jack"), Tuple2(100, "luce"), Tuple2(85, "ff"));
val scoreRDD = sc.parallelize(numlist)
val retValuRDD = scoreRDD.sortByKey(false)
retValuRDD.collect.foreach(println)
}
/**
* join:列印每個學生的成績
*/
def join(sc: SparkContext): Unit = {
val students = Array(
Tuple2(1, "tom"),
Tuple2(2, "jack"),
Tuple2(3, "marry"),
Tuple2(4, "ff"))
val scores = Array(
Tuple2(1, 80),
Tuple2(2, 90),
Tuple2(3, 100),
Tuple2(1, 60),
Tuple2(2, 80))
val studentRDD = sc.parallelize(students)
val scoreRDD = sc.parallelize(scores)
val retRDD = studentRDD.join(scoreRDD)
retRDD.foreach(println)
/**
* (1,(tom,80))
* (2,(jack,90))
* (1,(tom,60))
* (2,(jack,80))
* (3,(marry,100))
*/
}
/**
* corgroup:列印每個學生的成績
*/
def corgroup(sc: SparkContext): Unit = {
val students = Array(
Tuple2(1, "tom"),
Tuple2(2, "jack"),
Tuple2(3, "marry"),
Tuple2(4, "ff"))
val scores = Array(
Tuple2(1, 80),
Tuple2(2, 90),
Tuple2(3, 100),
Tuple2(1, 60),
Tuple2(2, 80))
val studentRDD = sc.parallelize(students)
val scoreRDD = sc.parallelize(scores)
val retRDD = studentRDD.cogroup(scoreRDD)
retRDD.foreach(println)
/**
* (4,(CompactBuffer(ff),CompactBuffer()))
* (1,(CompactBuffer(tom),CompactBuffer(80, 60)))
* (2,(CompactBuffer(jack),CompactBuffer(90, 80)))
* (3,(CompactBuffer(marry),CompactBuffer(100)))
*/
}
}