Scala版本的wordCount
object ScalaWordCount {
def main(args: Array[String]): Unit = {
if(args.length != ){
println("Usage:cn.edu360.spark31.day01.ScalaWordCount <input><output>")
sys.exit()
}
// 接收参数
val Array(input,output)= args
// 创建一个配置对象
val conf:SparkConf = new SparkConf()
// 创建一个SparkContext的对象
val sc:SparkContext = new SparkContext(conf)
// 读取文件
val files: RDD[String] = sc.textFile(input)
// 切分并压平
val splitedData: RDD[String] = files.flatMap(_.split(" "))
// 组装
val wordsAndOne: RDD[(String, Int)] = splitedData.map((_,))
// 进行分组聚合
val result: RDD[(String, Int)] = wordsAndOne.reduceByKey(_+_)
// 如果需要排序
val sorted = result.sortBy(-_._2)
// 存储结果数据
sorted.saveAsTextFile(output)
// 释放资源
sc.stop()
/// 区别 spark中,rdd上的方法 名称和 scala本地方法的名称,可能是一致的
// spark中调用的是rdd上的flatMap 方法, scala 中的flatMap 是本地集合的方法
}
}
java版本的wordCount
public class JavaWordCount {
public static void main(String[] args) {
//提示输入参数个数,若不是两个就会提示
if (args.length != ) {
System.out.println("Usage:cn.edu360.ScalaWordCount <input><output>");
System.exit();
}
//获取配置对象
SparkConf conf = new SparkConf();
//要是不在集群中测试,在本地测试 则设置一下参数
// conf.setMaster("local");
// conf.setAppName(JavaWordCount.class.getSimpleName());
// 利用java程序来编程 spark程序的时候new JavaSparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 读文件
JavaRDD<String> lines = jsc.textFile(args[]);
// 切分并压平 flatMap方法中,参数的类型是一个接口 两个参数 一个输入参数 一个输出参数
JavaRDD<String> flatData = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
// 方法的返回值类型是迭代器 需要把 数组类型的返回值 转换成 迭代器类型
return Arrays.asList(line.split(" ")).iterator();
}
});
// 和1 组装成元组 第一个参数 是输入数据类型 第二个是返回值类型(元组)的key,第三个是元组的value
JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, );
}
});
// 分组聚合 三个参数 reduceByKey((a,b)=>a+b) 最后一个Iinteger 表达 返回值的类型
JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 先在本地测试一下
/* result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> line) throws Exception {
System.out.println(line);
}
});
*/
// 可选,排序 <String,Integer> ---> <Integer ,String>
JavaPairRDD<Integer, String> beforeSort = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
// 把元组的key 和 value 交换 位置
return tp.swap();
}
});
// 默认是升序,如果需要降序
JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);
// 把key 和 value的顺序 交换回来
JavaPairRDD<String, Integer> finalRes = sortedData.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});
// 把排序后的结果写入文件中
finalRes.saveAsTextFile(args[]);
// 释放资源
jsc.stop(); // close()
}
.Javalamda版本的wordCount
public class JavaLambdaWordCount {
public static void main(String[] args) {
if (args.length != ) {
System.out.println("Usage:cn.edu360.spark31.day02.JavaLambdaWordCount <input><output>");
System.exit();
}
//同上
SparkConf conf = new SparkConf();
// conf.setMaster("local");
// conf.setAppName(JavaWordCount.class.getSimpleName());
// 同上
JavaSparkContext jsc = new JavaSparkContext(conf);
// 读文件
JavaRDD<String> lines = jsc.textFile(args[]);
// 切分并压平
JavaRDD<String> flatData = lines.flatMap(t -> Arrays.asList(t.split(" ")).iterator());
// 和1 组装
JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(line -> new Tuple2<>(line, ));
// 分组聚合
JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey((a, b) -> a + b);
JavaPairRDD<Integer, String> beforeSort = result.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);
// 把元素交换回来
JavaPairRDD<String, Integer> finalResu = sortedData.mapToPair(tp -> tp.swap());
// 保存结果
finalResu.saveAsTextFile(args[]);
// 释放资源
jsc.stop(); // close()
}
}
SparkFrame版本的wordCount
object DataFrameWC {
def main(args: Array[String]): Unit = {
//new配置实例
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
//入口
val sc = new SparkContext(conf)
//sparkSQL的入口
val sqlContext: SQLContext = new SQLContext(sc)
//导入SQLContext实例上的隐式转换
import sqlContext.implicits._
//读文件
val words: RDD[String] = sc.textFile("wc.txt")
//切分
val wordsRdd: RDD[String] = words.flatMap(_.split(" "))
//将RDD转化成DATAFrame toDF()!!!!!!当然下面的set也可以啦
val worddf: DataFrame = wordsRdd.map(Word(_)).toDF()
//不想转DF也可以 自己搞个类型嘛。。。下文有个样例类(不用new 方便的一批)
// wordsRdd.map(t=>Word(t))
//注册一个临时表 ----要用sql没有表怎么行,搞个临时的嘛
worddf.registerTempTable("t_word")
//执行sql语句,
val result: DataFrame = sqlContext.sql("select name ,count(*) as cnts from t_word group by name order by cnts desc ")
//展示结果
result.show()
//关闭资源
sc.stop()
}
}
// 定义一个字段 name //好像是被我注释掉了
case class Word(name:String)
DataSet SQL语法的wordCount
object DataSetWC {
def main(args: Array[String]): Unit = {
// 统一的API入口
// session 或者使用 spark 如果当期上下文环境中有,直接拿来使用,否则再创建
val session: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
// 如何从sparkSession中获取到sparkContext和sqlContext
// val sc: SparkContext = session.sparkContext
// val sqlContext = session.sqlContext
// 导入sparksession实例上的隐式转换
import session.implicits._
// sessionApi 获取到的数据类型就是Dataset
val file: Dataset[String] = session.read.textFile("wc.txt")
val wDs: Dataset[String] = file.flatMap(_.split(" "))
// Dataset 默认有schema信息
wDs.printSchema()
// |-- value: string (nullable = true) // 默认shema的名称是value
wDs.schema
// SQL DSL
// 注册临时表
wDs.createTempView("v_word")
val result: DataFrame = session.sql("select value ,count(*) cnts from v_word group by value order by cnts desc ")
result.show()
session.stop()
// session.close()
}
}
Dataset DSL 语法版本的wordCount
object DatasetDSLDemo {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
// 导入sparksession实例上的隐式转换
import session.implicits._
// sessionApi 获取到的数据类型就是Dataset
val file: Dataset[String] = session.read.textFile("person.txt")
val pdf: DataFrame = file.map(_.split(" ")).map(t=>(t(),t().toInt,t().toInt)).toDF("name","age","fv")
// DSL 语法统计
// select where group by order by count sum max min
// 选择
pdf.select("name","age")
// .show()
// pdf.select(pdf.col("name"),pdf.col("age")).show()
// pdf.select(pdf("name"),pdf("age")).show()
// where 条件 过滤 where 调用的是filterAPI
// pdf.where("fv > 93 ")
pdf.filter("fv > 93") // .show()
// 排序 order by Sort orderBy 调用的就是sort
pdf.orderBy("age").show()
// 多个条件的排序
pdf.sort($"age" ,$"fv" desc) //.show()
// 分组之后还需要进行聚合统计 sum max min avg
pdf.groupBy($"age").sum("fv") //.show()
val result1: DataFrame = pdf.groupBy($"age").count()
// result1.show()
// 聚合条件 需要导入函数
import org.apache.spark.sql.functions._
// 聚合时,指定别名
pdf.groupBy($"age").agg(count("*") as "cnts")// .show()
pdf.groupBy($"age").agg(max("fv"))//.show()
// 重命名列名
pdf.withColumnRenamed("age","myage").show()
// pdf
session.stop()
}
}
Streaming版本的wordcount
object StreamingWordCount {
// 设置日志级别为 Warn
val logger = Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
// 本地运行的 配置
.setMaster("local[5]") // 至少是2个cores
.setAppName(this.getClass.getSimpleName)
// conf: SparkConf, batchDuration: Duration
// 时间间隔 需要根据我们的业务, 不是随便定义的
val ssc: StreamingContext = new StreamingContext(conf, Seconds())
// 从 socket中获取数据创建一个将要连接到 hostname:port 的 DStream,
// 这里读取socket数据的Receiver,需要一个core
val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)
val textStream2: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)
textStream2.print()
// 1 个 receiver
// 获取到数据之后的业务逻辑
val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,)).reduceByKey(_+_)
// 打印结果 默认打印 10行
result.print()
// prefix-TIME_IN_MS.suffix
result.saveAsTextFiles("stream","log")
// 启动主程序 需要去连接socket
ssc.start()
// 阻塞 等待程序被关闭
ssc.awaitTermination()
}
}
Streaming SQL 版本的wordcount
object StreamingSQL {
// 设置日志级别为 Warn
val logger = Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
// 本地运行的 配置
.setMaster("local[2]") // 至少是2个cores
.setAppName(this.getClass.getSimpleName)
// conf: SparkConf, batchDuration: Duration
// 时间间隔 需要根据我们的业务, 不是随便定义的
val ssc: StreamingContext = new StreamingContext(conf, Seconds())
// 从 socket中获取数据创建一个将要连接到 hostname:port 的 DStream,
// 这里读取socket数据的Receiver,需要一个core
val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03", )
// DStream = rdd + time
// 如何使用SQL 语法来操作数据
textStream.foreachRDD(rdd => {
val session = SparkSession.builder()
.config(conf) // 使用已经存在的conf对象
.getOrCreate()
import session.implicits._
// 按照空格切分 所有的空格
val wordRDD: RDD[String] = rdd.flatMap(_.split(" +"))
val wdf: DataFrame = wordRDD.toDF("word")
// 注册临时视图 createOrReplaceTempView
wdf.createOrReplaceTempView("v_word")
val dfRes = session.sql("select word,count(1) cnts from v_word group by word order by cnts desc")
dfRes.show()
})
// 启动主程序 需要去连接socket
ssc.start()
// 阻塞 等待程序被关闭
ssc.awaitTermination()
}
}
Kafka版本的wordCount
object StreamingKafkaWC {
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getSimpleName)
val ssc= new StreamingContext(conf,Seconds())
// kafka的参数配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hdp-02:9092,hdp-03:9092,hdp-04:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "hello_topic_group",
// 从哪个位置开始读取数据
"auto.offset.reset" -> "earliest",
// 是否可以自动提交偏移量 自定义
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// streaming 支持 读取kafka的多个主题
val topics = Array("helloTopic6", "topicB")
// 指定泛型约定
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
// 我们的broker和executor在同一台机器上
// LocationStrategies.PreferBrokers
// 资源会平均分配
LocationStrategies.PreferConsistent,
// PreferConsistent,
// 消费者 订阅哪些主题
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd=>{
// rdd中的数据类型是 ConsumerRecord[String, String] 拿到这里的value
val result = rdd.map(_.value()).map((_,)).reduceByKey(_+_)
result.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}