天天看點

spark wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SprakWordCount {
  def main(args: Array[String]): Unit = {
    //參數檢查
    if (args.length < 2) {
      System.err.println("Usage: myWordCount <input> <output>")
      System.exit(1)
    }

    //設定spark配置
    val conf: SparkConf = new SparkConf().setAppName("myWordCount") //.setMaster("local[*]")
    //
    val sc:SparkContext=new SparkContext(conf)
    //讀取輸入資料 s
    val  lines=sc.textFile(args(0))
    //處理資料
    //切分壓平
    val words: RDD[String] = lines.flatMap(_.split(","))
    //将單詞和1組合在一起 讓每個單詞1都出現一次
    val wordone: RDD[(String, Int)] = words.map((_, 1))
    //聚合 單詞計數
    //在一個(K,V)的RDD上調用,傳回一個(K,V)的RDD,使用指定的reduce函數,
    // 将相同key的值聚合到一起,與groupByKey類似,
    // reduce任務的個數可以通過第二個可選的參數來設定
    val reduced: RDD[(String, Int)] = wordone.reduceByKey(_ + _)
    //排序 按照單詞出現的次數 降序排序
    val ans = reduced.sortBy(_._2, false)

    
      
   
      ans.saveAsTextFile(args(1))
      //    println(ans.collect().toBuffer)

    

    //關閉
    sc.stop()

  }

}

      

繼續閱讀