天天看點

SparkStreaming案例測試

package com.caimh

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 對源源不斷的流式資料進行統計
  * Created by caimh on 2019/11/3.
  */
object StreamingWordCount {

  def main(args: Array[String]) {

    //建立SparkStreamingContext
    val sc: SparkConf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    //擷取資料(監聽服務端口,端口自己設定)
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master-node", 8888)

    //DStream[String]
    val words: DStream[String] = lines.flatMap(_.split(" "))
    //DStream[(String,1)]
    val wordMap: DStream[(String, Int)] = words.map((_, 1))
    //DStream[(String,sum)]
    val res: DStream[(String, Int)] = wordMap.reduceByKey(_ + _)

    res.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
           

測試

先啟動服務 nc -lk 8888

[[email protected] ~]$ sudo yum install nc.x86_64
[[email protected] ~]$ nc -lk 8888
           

啟動編寫的StreamingWordCount應用程式,在linux服務端輸入單詞,測試結果如下 

SparkStreaming案例測試

繼續閱讀