天天看點

Spark Streaming自定義Receiver類

Spark Streaming自定義Receiver類

1.自定義CustomReceiver

class CustomReveicer(host: String, port: Int) extends Receiver[String](StorageLevels.MEMORY_AND_DISK_2) with Logging {

  override def onStart(): Unit = {
    // 通過線程接受資料
    new Thread("Custom Receive") {
      override def run(): Unit = {
        receive()
      }
    }.start()
  }

  override def onStop(): Unit = {

  }

  /**
    * 自定義Receiver的方法,接收receiver的資料
    */
  private def receive(): Unit = {
    var socket: Socket = null
    var reader: BufferedReader = null
    try {
      logInfo(s"Connecting to $host : $port")
      // 建立Socket流,監聽host,port
      socket = new Socket(host, port)

      logInfo(s"Connected to $host : $port")
      // 建立一個BufferRead流,讀取Socket的流資料
      reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
      var line = reader.readLine()
      // 循環讀取
      while (!isStopped() && line != null) {
        // 讀取的一行資料,進行儲存成塊
        store(line)
        line = reader.readLine()
      }
      // 關閉流
      reader.close()
      socket.close()
      logInfo("Stopped receiving")
      restart("")
    } catch {
      case e: java.net.ConnectException => restart(s"Error connecting to $host : $port", e)
      case t: Throwable => restart("Error receiving data", t)
    }
  }
}
           

2.使用CustomReceiver接收資料

object CustomReveicer {
  def main(args: Array[String]): Unit = {
    if (args.length != ) {
      System.err.println("Usage <host> <port>")
      System.exit()
    }
    // 設定Spark Streaming的log的級别,設定為WARN級别
    StreamingLogger.setLoggerLevel()

    // 設定SparkConf的參數
    val sparkConf = new SparkConf().setAppName("CustomReveicer").setMaster("local[2]")
    // 初始化StreamingContext,并設定批處理間隔時間
    val ssc = new StreamingContext(sparkConf, Seconds())
    // 設定Receiver Stream的監聽
    val line = ssc.receiverStream(new CustomReveicer(host = args(), port = args().toInt))
    // 處理資料
    val word = line.flatMap(_.split(" "))
    val wordCount = word.map((_, ))
    val wordCounts = wordCount.reduceByKey(_ + _)
    // 輸出資料
    wordCounts.print()

    // 啟動SparkStreaming
    ssc.start()
    ssc.awaitTermination()
  }
}
           

3.StreamingLogger設定Logger級别的類

object StreamingLogger extends Logging {
  def setLoggerLevel(): Unit = {
    val elements = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!elements) {
      logInfo("set logger level warn")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
           

4.啟動nc指令發送資料,并執行Spark Streaming程式

4.1 啟動nc指令

nc -l 9999
Spark Streaming自定義Receiver類

4.2 添加參數,執行Spark Streaming程式

Spark Streaming自定義Receiver類

4.3 結果輸出

Spark Streaming自定義Receiver類

繼續閱讀