package com.caimh
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 对源源不断的流式数据进行统计
* Created by caimh on 2019/11/3.
*/
object StreamingWordCount {
def main(args: Array[String]) {
//创建SparkStreamingContext
val sc: SparkConf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
//获取数据(监听服务端口,端口自己设置)
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master-node", 8888)
//DStream[String]
val words: DStream[String] = lines.flatMap(_.split(" "))
//DStream[(String,1)]
val wordMap: DStream[(String, Int)] = words.map((_, 1))
//DStream[(String,sum)]
val res: DStream[(String, Int)] = wordMap.reduceByKey(_ + _)
res.print()
ssc.start()
ssc.awaitTermination()
}
}
测试
先启动服务 nc -lk 8888
[[email protected] ~]$ sudo yum install nc.x86_64
[[email protected] ~]$ nc -lk 8888
启动编写的StreamingWordCount应用程序,在linux服务端输入单词,测试结果如下
