天天看點

9種版本的wordCount——scala、java、javaLamda、DataFrame、DataSet、Streaming、Streaming-SQL、Kafka

Scala版本的wordCount

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length != ){
      println("Usage:cn.edu360.spark31.day01.ScalaWordCount <input><output>")
      sys.exit()
    }
    // 接收參數
    val  Array(input,output)= args
    // 建立一個配置對象
    val conf:SparkConf  = new SparkConf()
    // 建立一個SparkContext的對象
    val sc:SparkContext = new SparkContext(conf)
    // 讀取檔案
    val files: RDD[String] = sc.textFile(input)
    // 切分并壓平
    val splitedData: RDD[String] = files.flatMap(_.split(" "))
    // 組裝
    val wordsAndOne: RDD[(String, Int)] = splitedData.map((_,))
    // 進行分組聚合
    val result: RDD[(String, Int)] = wordsAndOne.reduceByKey(_+_)
    // 如果需要排序
    val sorted = result.sortBy(-_._2)
    // 存儲結果資料
    sorted.saveAsTextFile(output)
    // 釋放資源
    sc.stop()
    ///  差別 spark中,rdd上的方法 名稱和 scala本地方法的名稱,可能是一緻的
    // spark中調用的是rdd上的flatMap 方法, scala  中的flatMap 是本地集合的方法

  }
}
           

java版本的wordCount

public class JavaWordCount {
    public static void main(String[] args) {
        //提示輸入參數個數,若不是兩個就會提示
        if (args.length != ) {
            System.out.println("Usage:cn.edu360.ScalaWordCount <input><output>");
            System.exit();
        }
        //擷取配置對象
        SparkConf conf = new SparkConf();
        //要是不在叢集中測試,在本地測試 則設定一下參數
//        conf.setMaster("local");
//        conf.setAppName(JavaWordCount.class.getSimpleName());

        // 利用java程式來程式設計 spark程式的時候new JavaSparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);

        // 讀檔案
        JavaRDD<String> lines = jsc.textFile(args[]);

        // 切分并壓平  flatMap方法中,參數的類型是一個接口    兩個參數  一個輸入參數  一個輸出參數
        JavaRDD<String> flatData = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                // 方法的傳回值類型是疊代器 需要把 數組類型的傳回值  轉換成 疊代器類型
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        // 和1 組裝成元組 第一個參數  是輸入資料類型   第二個是傳回值類型(元組)的key,第三個是元組的value
        JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, );
            }
        });
        // 分組聚合  三個參數  reduceByKey((a,b)=>a+b)    最後一個Iinteger 表達 傳回值的類型
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 先在本地測試一下
     /*   result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> line) throws Exception {
                System.out.println(line);
            }
        });
*/
        // 可選,排序   <String,Integer>   ---> <Integer ,String>
        JavaPairRDD<Integer, String> beforeSort = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                // 把元組的key  和 value  交換 位置
                return tp.swap();
            }
        });

        // 預設是升序,如果需要降序
        JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);

        // 把key  和  value的順序 交換回來
        JavaPairRDD<String, Integer> finalRes = sortedData.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        // 把排序後的結果寫入檔案中
        finalRes.saveAsTextFile(args[]);

        // 釋放資源
        jsc.stop(); // close()

    }
           

.Javalamda版本的wordCount

public class JavaLambdaWordCount {
    public static void main(String[] args) {
        if (args.length != ) {
            System.out.println("Usage:cn.edu360.spark31.day02.JavaLambdaWordCount <input><output>");
            System.exit();
        }
        //同上
        SparkConf conf = new SparkConf();
//        conf.setMaster("local");
//        conf.setAppName(JavaWordCount.class.getSimpleName());

        // 同上
        JavaSparkContext jsc = new JavaSparkContext(conf);

        // 讀檔案
        JavaRDD<String> lines = jsc.textFile(args[]);

        // 切分并壓平
        JavaRDD<String> flatData = lines.flatMap(t -> Arrays.asList(t.split(" ")).iterator());

        // 和1 組裝
        JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(line -> new Tuple2<>(line, ));

        // 分組聚合

        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey((a, b) -> a + b);
        JavaPairRDD<Integer, String> beforeSort = result.mapToPair(tp -> tp.swap());
        JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);
        // 把元素交換回來
        JavaPairRDD<String, Integer> finalResu = sortedData.mapToPair(tp -> tp.swap());
        // 儲存結果
        finalResu.saveAsTextFile(args[]);
        // 釋放資源
        jsc.stop(); // close()
    }
}
           

SparkFrame版本的wordCount

object DataFrameWC {

  def main(args: Array[String]): Unit = {
    //new配置執行個體
    val conf = new SparkConf()
    .setAppName(this.getClass.getSimpleName)
    .setMaster("local[*]")
    //入口
    val sc = new SparkContext(conf)
    //sparkSQL的入口
    val sqlContext: SQLContext = new SQLContext(sc)
    //導入SQLContext執行個體上的隐式轉換
    import sqlContext.implicits._
    //讀檔案
    val words: RDD[String] = sc.textFile("wc.txt")
    //切分
    val wordsRdd: RDD[String] = words.flatMap(_.split(" "))
    //将RDD轉化成DATAFrame toDF()!!!!!!當然下面的set也可以啦
    val worddf: DataFrame = wordsRdd.map(Word(_)).toDF()
    //不想轉DF也可以 自己搞個類型嘛。。。下文有個樣例類(不用new 友善的一批)
//    wordsRdd.map(t=>Word(t))
    //注冊一個臨時表 ----要用sql沒有表怎麼行,搞個臨時的嘛
    worddf.registerTempTable("t_word")
    //執行sql語句, 
    val result: DataFrame = sqlContext.sql("select name ,count(*) as cnts from t_word group by name order by cnts desc ")
    //展示結果
    result.show()
    //關閉資源
    sc.stop()
  }
}
// 定義一個字段  name //好像是被我注釋掉了
case class Word(name:String)
           

DataSet SQL文法的wordCount

object DataSetWC {

  def main(args: Array[String]): Unit = {
    // 統一的API入口
    // session  或者使用 spark  如果當期上下文環境中有,直接拿來使用,否則再建立
    val session: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()

    // 如何從sparkSession中擷取到sparkContext和sqlContext
    //    val sc: SparkContext = session.sparkContext
    //    val sqlContext = session.sqlContext

    // 導入sparksession執行個體上的隐式轉換
    import session.implicits._

    // sessionApi 擷取到的資料類型就是Dataset
    val file: Dataset[String] = session.read.textFile("wc.txt")

    val wDs: Dataset[String] = file.flatMap(_.split(" "))

    // Dataset 預設有schema資訊
    wDs.printSchema()
    //    |-- value: string (nullable = true)  // 預設shema的名稱是value
    wDs.schema


    // SQL  DSL
    // 注冊臨時表
    wDs.createTempView("v_word")

    val result: DataFrame = session.sql("select value ,count(*) cnts from v_word group by value order by cnts desc ")

    result.show()

    session.stop()
    //    session.close()

  }
}
           

Dataset DSL 文法版本的wordCount

object DatasetDSLDemo {

  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()

    // 導入sparksession執行個體上的隐式轉換
    import session.implicits._

    // sessionApi 擷取到的資料類型就是Dataset
    val file: Dataset[String] = session.read.textFile("person.txt")

    val pdf: DataFrame = file.map(_.split(" ")).map(t=>(t(),t().toInt,t().toInt)).toDF("name","age","fv")

    // DSL 文法統計
    // select where group by  order by count sum max min
    // 選擇
    pdf.select("name","age")
//      .show()
//    pdf.select(pdf.col("name"),pdf.col("age")).show()
//    pdf.select(pdf("name"),pdf("age")).show()

    // where 條件  過濾  where 調用的是filterAPI
//    pdf.where("fv > 93 ")
    pdf.filter("fv > 93") // .show()

    // 排序 order by  Sort  orderBy 調用的就是sort
    pdf.orderBy("age").show()

    // 多個條件的排序
    pdf.sort($"age" ,$"fv" desc) //.show()

    // 分組之後還需要進行聚合統計  sum max min  avg
     pdf.groupBy($"age").sum("fv") //.show()

    val result1: DataFrame = pdf.groupBy($"age").count()

//    result1.show()

    // 聚合條件 需要導入函數
    import org.apache.spark.sql.functions._
    // 聚合時,指定别名
    pdf.groupBy($"age").agg(count("*") as "cnts")// .show()

    pdf.groupBy($"age").agg(max("fv"))//.show()

    // 重命名列名
    pdf.withColumnRenamed("age","myage").show()

    // pdf

    session.stop()
  }
}
           

Streaming版本的wordcount

object StreamingWordCount {

  // 設定日志級别為 Warn
  val logger = Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      // 本地運作的 配置
    .setMaster("local[5]") // 至少是2個cores
    .setAppName(this.getClass.getSimpleName)

    //    conf: SparkConf, batchDuration: Duration
    // 時間間隔  需要根據我們的業務,  不是随便定義的
    val ssc: StreamingContext = new StreamingContext(conf, Seconds())

    //  從 socket中擷取資料建立一個将要連接配接到 hostname:port 的 DStream,
    // 這裡讀取socket資料的Receiver,需要一個core
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)


    val textStream2: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)


    textStream2.print()
    // 1 個  receiver
    // 擷取到資料之後的業務邏輯
    val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,)).reduceByKey(_+_)

    // 列印結果 預設列印 10行
    result.print()

    // prefix-TIME_IN_MS.suffix
    result.saveAsTextFiles("stream","log")

    // 啟動主程式  需要去連接配接socket
    ssc.start()

    // 阻塞 等待程式被關閉
    ssc.awaitTermination()
  }
}
           

Streaming SQL 版本的wordcount

object StreamingSQL {

  // 設定日志級别為 Warn
  val logger = Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      // 本地運作的 配置
      .setMaster("local[2]") // 至少是2個cores
      .setAppName(this.getClass.getSimpleName)

    //    conf: SparkConf, batchDuration: Duration
    // 時間間隔  需要根據我們的業務,  不是随便定義的
    val ssc: StreamingContext = new StreamingContext(conf, Seconds())

    //  從 socket中擷取資料建立一個将要連接配接到 hostname:port 的 DStream,
    // 這裡讀取socket資料的Receiver,需要一個core
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03", )


    // DStream   = rdd  +  time
    // 如何使用SQL 文法來操作資料
    textStream.foreachRDD(rdd => {
      val session = SparkSession.builder()
        .config(conf) // 使用已經存在的conf對象
        .getOrCreate()

      import session.implicits._

      // 按照空格切分 所有的空格
      val wordRDD: RDD[String] = rdd.flatMap(_.split(" +"))
      val wdf: DataFrame = wordRDD.toDF("word")
      // 注冊臨時視圖 createOrReplaceTempView
      wdf.createOrReplaceTempView("v_word")

      val dfRes = session.sql("select word,count(1) cnts from v_word group by word order by cnts desc")

      dfRes.show()

    })


    // 啟動主程式  需要去連接配接socket
    ssc.start()
    // 阻塞 等待程式被關閉
    ssc.awaitTermination()
  }
}
           

Kafka版本的wordCount

object StreamingKafkaWC {

  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName(this.getClass.getSimpleName)

    val ssc=  new StreamingContext(conf,Seconds())

    // kafka的參數配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hdp-02:9092,hdp-03:9092,hdp-04:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "hello_topic_group",
      // 從哪個位置開始讀取資料
      "auto.offset.reset" -> "earliest",
      // 是否可以自動送出偏移量   自定義
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    // streaming 支援 讀取kafka的多個主題
    val topics = Array("helloTopic6", "topicB")
    // 指定泛型約定
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      // 我們的broker和executor在同一台機器上
//      LocationStrategies.PreferBrokers
      // 資源會平均配置設定
      LocationStrategies.PreferConsistent,
//      PreferConsistent,
      // 消費者 訂閱哪些主題
      Subscribe[String, String](topics, kafkaParams)
    )

  stream.foreachRDD(rdd=>{
    // rdd中的資料類型是 ConsumerRecord[String, String]   拿到這裡的value
    val result = rdd.map(_.value()).map((_,)).reduceByKey(_+_)
    result.foreach(println)
  })


    ssc.start()
    ssc.awaitTermination()

  }
}
           

繼續閱讀