Spark Streaming自定義Receiver類
1.自定義CustomReceiver
class CustomReveicer(host: String, port: Int) extends Receiver[String](StorageLevels.MEMORY_AND_DISK_2) with Logging {
override def onStart(): Unit = {
// 通過線程接受資料
new Thread("Custom Receive") {
override def run(): Unit = {
receive()
}
}.start()
}
override def onStop(): Unit = {
}
/**
* 自定義Receiver的方法,接收receiver的資料
*/
private def receive(): Unit = {
var socket: Socket = null
var reader: BufferedReader = null
try {
logInfo(s"Connecting to $host : $port")
// 建立Socket流,監聽host,port
socket = new Socket(host, port)
logInfo(s"Connected to $host : $port")
// 建立一個BufferRead流,讀取Socket的流資料
reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
var line = reader.readLine()
// 循環讀取
while (!isStopped() && line != null) {
// 讀取的一行資料,進行儲存成塊
store(line)
line = reader.readLine()
}
// 關閉流
reader.close()
socket.close()
logInfo("Stopped receiving")
restart("")
} catch {
case e: java.net.ConnectException => restart(s"Error connecting to $host : $port", e)
case t: Throwable => restart("Error receiving data", t)
}
}
}
2.使用CustomReceiver接收資料
object CustomReveicer {
def main(args: Array[String]): Unit = {
if (args.length != ) {
System.err.println("Usage <host> <port>")
System.exit()
}
// 設定Spark Streaming的log的級别,設定為WARN級别
StreamingLogger.setLoggerLevel()
// 設定SparkConf的參數
val sparkConf = new SparkConf().setAppName("CustomReveicer").setMaster("local[2]")
// 初始化StreamingContext,并設定批處理間隔時間
val ssc = new StreamingContext(sparkConf, Seconds())
// 設定Receiver Stream的監聽
val line = ssc.receiverStream(new CustomReveicer(host = args(), port = args().toInt))
// 處理資料
val word = line.flatMap(_.split(" "))
val wordCount = word.map((_, ))
val wordCounts = wordCount.reduceByKey(_ + _)
// 輸出資料
wordCounts.print()
// 啟動SparkStreaming
ssc.start()
ssc.awaitTermination()
}
}
3.StreamingLogger設定Logger級别的類
object StreamingLogger extends Logging {
def setLoggerLevel(): Unit = {
val elements = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!elements) {
logInfo("set logger level warn")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
4.啟動nc指令發送資料,并執行Spark Streaming程式
4.1 啟動nc指令
nc -l 9999
4.2 添加參數,執行Spark Streaming程式
4.3 結果輸出