package com.scala.my
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Durations
/**
* spark streaming 版本的單詞統計(通過監聽端口)
* 1\在 hh15上執行 :yum install nc 安裝工具nc
* 第一種本地測試的步驟:
* 2\在hh15上啟動端口:#nc -lk 8888
* 3\在本地eclipse上run程式
* 4\在hh15上的dos界面中輸入單詞,輸入的時間要在Durations.seconds(10)的範圍内
* 5\在eclipse界面檢視是否成功
* 第二種伺服器上測試的步驟:
* 2\開啟spark叢集:(1)啟動zookeeeper (2)在hh15上的spark的sbin下啟動spark叢集---》#sh start-all.sh
* 3\在hh15上開啟端口:#nc -lk 8888
* 4\将WordCount.java類打成jar包(sparkStreamWordCount.jar),并且放到hh15上
* 5\使用standalone模式在hh15上執行:(一般是背景執行)
* # nohup sh
* 6\在剛才開啟端口的視窗輸入單詞
*
*
* 測試結果:第一種方式測試通過,第二種方式未測試
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
//擷取context 至少并行度是2
val sc=new StreamingContext(new SparkConf().setAppName("StreamCount").setMaster("local[2]"),Durations.seconds(8))
//擷取端口輸入的文本資訊
val lines=sc.socketTextStream("192.168.142.115", 8888)
//壓扁
val paris=lines.flatMap(x=>x.split(" "))
//map
val words=paris.map((_,1))
//reduce
val result=words.reduceByKey((x,y)=>x+y)
//列印前10個數
result.print()
//開啟start
sc.start()
//等待
sc.awaitTermination()
//關閉資源
sc.stop()
}
}