天天看点

SparkStreaming案例测试

package com.caimh

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 对源源不断的流式数据进行统计
  * Created by caimh on 2019/11/3.
  */
object StreamingWordCount {

  def main(args: Array[String]) {

    //创建SparkStreamingContext
    val sc: SparkConf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    //获取数据(监听服务端口,端口自己设置)
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master-node", 8888)

    //DStream[String]
    val words: DStream[String] = lines.flatMap(_.split(" "))
    //DStream[(String,1)]
    val wordMap: DStream[(String, Int)] = words.map((_, 1))
    //DStream[(String,sum)]
    val res: DStream[(String, Int)] = wordMap.reduceByKey(_ + _)

    res.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
           

测试

先启动服务 nc -lk 8888

[[email protected] ~]$ sudo yum install nc.x86_64
[[email protected] ~]$ nc -lk 8888
           

启动编写的StreamingWordCount应用程序,在linux服务端输入单词,测试结果如下 

SparkStreaming案例测试

继续阅读