package com.caimh
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 對源源不斷的流式資料進行統計
* Created by caimh on 2019/11/3.
*/
object StreamingWordCount {
def main(args: Array[String]) {
//建立SparkStreamingContext
val sc: SparkConf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
//擷取資料(監聽服務端口,端口自己設定)
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master-node", 8888)
//DStream[String]
val words: DStream[String] = lines.flatMap(_.split(" "))
//DStream[(String,1)]
val wordMap: DStream[(String, Int)] = words.map((_, 1))
//DStream[(String,sum)]
val res: DStream[(String, Int)] = wordMap.reduceByKey(_ + _)
res.print()
ssc.start()
ssc.awaitTermination()
}
}
測試
先啟動服務 nc -lk 8888
[[email protected] ~]$ sudo yum install nc.x86_64
[[email protected] ~]$ nc -lk 8888
啟動編寫的StreamingWordCount應用程式,在linux服務端輸入單詞,測試結果如下
