天天看点

9种版本的wordCount——scala、java、javaLamda、DataFrame、DataSet、Streaming、Streaming-SQL、Kafka

Scala版本的wordCount

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length != ){
      println("Usage:cn.edu360.spark31.day01.ScalaWordCount <input><output>")
      sys.exit()
    }
    // 接收参数
    val  Array(input,output)= args
    // 创建一个配置对象
    val conf:SparkConf  = new SparkConf()
    // 创建一个SparkContext的对象
    val sc:SparkContext = new SparkContext(conf)
    // 读取文件
    val files: RDD[String] = sc.textFile(input)
    // 切分并压平
    val splitedData: RDD[String] = files.flatMap(_.split(" "))
    // 组装
    val wordsAndOne: RDD[(String, Int)] = splitedData.map((_,))
    // 进行分组聚合
    val result: RDD[(String, Int)] = wordsAndOne.reduceByKey(_+_)
    // 如果需要排序
    val sorted = result.sortBy(-_._2)
    // 存储结果数据
    sorted.saveAsTextFile(output)
    // 释放资源
    sc.stop()
    ///  区别 spark中,rdd上的方法 名称和 scala本地方法的名称,可能是一致的
    // spark中调用的是rdd上的flatMap 方法, scala  中的flatMap 是本地集合的方法

  }
}
           

java版本的wordCount

public class JavaWordCount {
    public static void main(String[] args) {
        //提示输入参数个数,若不是两个就会提示
        if (args.length != ) {
            System.out.println("Usage:cn.edu360.ScalaWordCount <input><output>");
            System.exit();
        }
        //获取配置对象
        SparkConf conf = new SparkConf();
        //要是不在集群中测试,在本地测试 则设置一下参数
//        conf.setMaster("local");
//        conf.setAppName(JavaWordCount.class.getSimpleName());

        // 利用java程序来编程 spark程序的时候new JavaSparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);

        // 读文件
        JavaRDD<String> lines = jsc.textFile(args[]);

        // 切分并压平  flatMap方法中,参数的类型是一个接口    两个参数  一个输入参数  一个输出参数
        JavaRDD<String> flatData = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                // 方法的返回值类型是迭代器 需要把 数组类型的返回值  转换成 迭代器类型
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        // 和1 组装成元组 第一个参数  是输入数据类型   第二个是返回值类型(元组)的key,第三个是元组的value
        JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, );
            }
        });
        // 分组聚合  三个参数  reduceByKey((a,b)=>a+b)    最后一个Iinteger 表达 返回值的类型
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 先在本地测试一下
     /*   result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> line) throws Exception {
                System.out.println(line);
            }
        });
*/
        // 可选,排序   <String,Integer>   ---> <Integer ,String>
        JavaPairRDD<Integer, String> beforeSort = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                // 把元组的key  和 value  交换 位置
                return tp.swap();
            }
        });

        // 默认是升序,如果需要降序
        JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);

        // 把key  和  value的顺序 交换回来
        JavaPairRDD<String, Integer> finalRes = sortedData.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        // 把排序后的结果写入文件中
        finalRes.saveAsTextFile(args[]);

        // 释放资源
        jsc.stop(); // close()

    }
           

.Javalamda版本的wordCount

public class JavaLambdaWordCount {
    public static void main(String[] args) {
        if (args.length != ) {
            System.out.println("Usage:cn.edu360.spark31.day02.JavaLambdaWordCount <input><output>");
            System.exit();
        }
        //同上
        SparkConf conf = new SparkConf();
//        conf.setMaster("local");
//        conf.setAppName(JavaWordCount.class.getSimpleName());

        // 同上
        JavaSparkContext jsc = new JavaSparkContext(conf);

        // 读文件
        JavaRDD<String> lines = jsc.textFile(args[]);

        // 切分并压平
        JavaRDD<String> flatData = lines.flatMap(t -> Arrays.asList(t.split(" ")).iterator());

        // 和1 组装
        JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(line -> new Tuple2<>(line, ));

        // 分组聚合

        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey((a, b) -> a + b);
        JavaPairRDD<Integer, String> beforeSort = result.mapToPair(tp -> tp.swap());
        JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);
        // 把元素交换回来
        JavaPairRDD<String, Integer> finalResu = sortedData.mapToPair(tp -> tp.swap());
        // 保存结果
        finalResu.saveAsTextFile(args[]);
        // 释放资源
        jsc.stop(); // close()
    }
}
           

SparkFrame版本的wordCount

object DataFrameWC {

  def main(args: Array[String]): Unit = {
    //new配置实例
    val conf = new SparkConf()
    .setAppName(this.getClass.getSimpleName)
    .setMaster("local[*]")
    //入口
    val sc = new SparkContext(conf)
    //sparkSQL的入口
    val sqlContext: SQLContext = new SQLContext(sc)
    //导入SQLContext实例上的隐式转换
    import sqlContext.implicits._
    //读文件
    val words: RDD[String] = sc.textFile("wc.txt")
    //切分
    val wordsRdd: RDD[String] = words.flatMap(_.split(" "))
    //将RDD转化成DATAFrame toDF()!!!!!!当然下面的set也可以啦
    val worddf: DataFrame = wordsRdd.map(Word(_)).toDF()
    //不想转DF也可以 自己搞个类型嘛。。。下文有个样例类(不用new 方便的一批)
//    wordsRdd.map(t=>Word(t))
    //注册一个临时表 ----要用sql没有表怎么行,搞个临时的嘛
    worddf.registerTempTable("t_word")
    //执行sql语句, 
    val result: DataFrame = sqlContext.sql("select name ,count(*) as cnts from t_word group by name order by cnts desc ")
    //展示结果
    result.show()
    //关闭资源
    sc.stop()
  }
}
// 定义一个字段  name //好像是被我注释掉了
case class Word(name:String)
           

DataSet SQL语法的wordCount

object DataSetWC {

  def main(args: Array[String]): Unit = {
    // 统一的API入口
    // session  或者使用 spark  如果当期上下文环境中有,直接拿来使用,否则再创建
    val session: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()

    // 如何从sparkSession中获取到sparkContext和sqlContext
    //    val sc: SparkContext = session.sparkContext
    //    val sqlContext = session.sqlContext

    // 导入sparksession实例上的隐式转换
    import session.implicits._

    // sessionApi 获取到的数据类型就是Dataset
    val file: Dataset[String] = session.read.textFile("wc.txt")

    val wDs: Dataset[String] = file.flatMap(_.split(" "))

    // Dataset 默认有schema信息
    wDs.printSchema()
    //    |-- value: string (nullable = true)  // 默认shema的名称是value
    wDs.schema


    // SQL  DSL
    // 注册临时表
    wDs.createTempView("v_word")

    val result: DataFrame = session.sql("select value ,count(*) cnts from v_word group by value order by cnts desc ")

    result.show()

    session.stop()
    //    session.close()

  }
}
           

Dataset DSL 语法版本的wordCount

object DatasetDSLDemo {

  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()

    // 导入sparksession实例上的隐式转换
    import session.implicits._

    // sessionApi 获取到的数据类型就是Dataset
    val file: Dataset[String] = session.read.textFile("person.txt")

    val pdf: DataFrame = file.map(_.split(" ")).map(t=>(t(),t().toInt,t().toInt)).toDF("name","age","fv")

    // DSL 语法统计
    // select where group by  order by count sum max min
    // 选择
    pdf.select("name","age")
//      .show()
//    pdf.select(pdf.col("name"),pdf.col("age")).show()
//    pdf.select(pdf("name"),pdf("age")).show()

    // where 条件  过滤  where 调用的是filterAPI
//    pdf.where("fv > 93 ")
    pdf.filter("fv > 93") // .show()

    // 排序 order by  Sort  orderBy 调用的就是sort
    pdf.orderBy("age").show()

    // 多个条件的排序
    pdf.sort($"age" ,$"fv" desc) //.show()

    // 分组之后还需要进行聚合统计  sum max min  avg
     pdf.groupBy($"age").sum("fv") //.show()

    val result1: DataFrame = pdf.groupBy($"age").count()

//    result1.show()

    // 聚合条件 需要导入函数
    import org.apache.spark.sql.functions._
    // 聚合时,指定别名
    pdf.groupBy($"age").agg(count("*") as "cnts")// .show()

    pdf.groupBy($"age").agg(max("fv"))//.show()

    // 重命名列名
    pdf.withColumnRenamed("age","myage").show()

    // pdf

    session.stop()
  }
}
           

Streaming版本的wordcount

object StreamingWordCount {

  // 设置日志级别为 Warn
  val logger = Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      // 本地运行的 配置
    .setMaster("local[5]") // 至少是2个cores
    .setAppName(this.getClass.getSimpleName)

    //    conf: SparkConf, batchDuration: Duration
    // 时间间隔  需要根据我们的业务,  不是随便定义的
    val ssc: StreamingContext = new StreamingContext(conf, Seconds())

    //  从 socket中获取数据创建一个将要连接到 hostname:port 的 DStream,
    // 这里读取socket数据的Receiver,需要一个core
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)


    val textStream2: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)


    textStream2.print()
    // 1 个  receiver
    // 获取到数据之后的业务逻辑
    val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,)).reduceByKey(_+_)

    // 打印结果 默认打印 10行
    result.print()

    // prefix-TIME_IN_MS.suffix
    result.saveAsTextFiles("stream","log")

    // 启动主程序  需要去连接socket
    ssc.start()

    // 阻塞 等待程序被关闭
    ssc.awaitTermination()
  }
}
           

Streaming SQL 版本的wordcount

object StreamingSQL {

  // 设置日志级别为 Warn
  val logger = Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      // 本地运行的 配置
      .setMaster("local[2]") // 至少是2个cores
      .setAppName(this.getClass.getSimpleName)

    //    conf: SparkConf, batchDuration: Duration
    // 时间间隔  需要根据我们的业务,  不是随便定义的
    val ssc: StreamingContext = new StreamingContext(conf, Seconds())

    //  从 socket中获取数据创建一个将要连接到 hostname:port 的 DStream,
    // 这里读取socket数据的Receiver,需要一个core
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03", )


    // DStream   = rdd  +  time
    // 如何使用SQL 语法来操作数据
    textStream.foreachRDD(rdd => {
      val session = SparkSession.builder()
        .config(conf) // 使用已经存在的conf对象
        .getOrCreate()

      import session.implicits._

      // 按照空格切分 所有的空格
      val wordRDD: RDD[String] = rdd.flatMap(_.split(" +"))
      val wdf: DataFrame = wordRDD.toDF("word")
      // 注册临时视图 createOrReplaceTempView
      wdf.createOrReplaceTempView("v_word")

      val dfRes = session.sql("select word,count(1) cnts from v_word group by word order by cnts desc")

      dfRes.show()

    })


    // 启动主程序  需要去连接socket
    ssc.start()
    // 阻塞 等待程序被关闭
    ssc.awaitTermination()
  }
}
           

Kafka版本的wordCount

object StreamingKafkaWC {

  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName(this.getClass.getSimpleName)

    val ssc=  new StreamingContext(conf,Seconds())

    // kafka的参数配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hdp-02:9092,hdp-03:9092,hdp-04:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "hello_topic_group",
      // 从哪个位置开始读取数据
      "auto.offset.reset" -> "earliest",
      // 是否可以自动提交偏移量   自定义
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    // streaming 支持 读取kafka的多个主题
    val topics = Array("helloTopic6", "topicB")
    // 指定泛型约定
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      // 我们的broker和executor在同一台机器上
//      LocationStrategies.PreferBrokers
      // 资源会平均分配
      LocationStrategies.PreferConsistent,
//      PreferConsistent,
      // 消费者 订阅哪些主题
      Subscribe[String, String](topics, kafkaParams)
    )

  stream.foreachRDD(rdd=>{
    // rdd中的数据类型是 ConsumerRecord[String, String]   拿到这里的value
    val result = rdd.map(_.value()).map((_,)).reduceByKey(_+_)
    result.foreach(println)
  })


    ssc.start()
    ssc.awaitTermination()

  }
}
           

继续阅读