天天看點

[Spark streaming舉例]-- 版本的單詞統計

package com.scala.my


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Durations
/**
 * spark streaming 版本的單詞統計(通過監聽端口)
 * 1\在 hh15上執行 :yum install nc 安裝工具nc
 * 第一種本地測試的步驟:
 *     2\在hh15上啟動端口:#nc -lk 8888
 *     3\在本地eclipse上run程式
 *     4\在hh15上的dos界面中輸入單詞,輸入的時間要在Durations.seconds(10)的範圍内
 *     5\在eclipse界面檢視是否成功
 * 第二種伺服器上測試的步驟:
 *     2\開啟spark叢集:(1)啟動zookeeeper  (2)在hh15上的spark的sbin下啟動spark叢集---》#sh start-all.sh
 *     3\在hh15上開啟端口:#nc -lk 8888
 *     4\将WordCount.java類打成jar包(sparkStreamWordCount.jar),并且放到hh15上
 *     5\使用standalone模式在hh15上執行:(一般是背景執行)
 *     # nohup sh 
 *     6\在剛才開啟端口的視窗輸入單詞
 *     
 *     
 *     測試結果:第一種方式測試通過,第二種方式未測試
 */
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //擷取context  至少并行度是2
    val sc=new StreamingContext(new SparkConf().setAppName("StreamCount").setMaster("local[2]"),Durations.seconds(8))
    //擷取端口輸入的文本資訊
    val lines=sc.socketTextStream("192.168.142.115", 8888)
    //壓扁
    val paris=lines.flatMap(x=>x.split(" ")) 
      //map
    val words=paris.map((_,1))
    //reduce
    val result=words.reduceByKey((x,y)=>x+y)
    //列印前10個數
    result.print()
    //開啟start
    sc.start()
    //等待
    sc.awaitTermination()
    //關閉資源
    sc.stop()
  }
}      

繼續閱讀