天天看點

Spark學習筆記(三):SparkStreaming實作對檔案夾和socket的監聽對檔案夾的監聽對socket端口的監聽

SparkStreaming是Spark的一個流式計算架構,它支援對許多資料源進行實時監聽,例如Kafka, Flume, Kinesis, TCP sockets,甚至檔案夾,并實作實時計算的能力。

對檔案夾的監聽

def fileStreaming(): Unit ={
        /**
          * 監聽檔案夾的新增檔案内容
          */
        // 至少要啟動2個線程以上,1個用于監聽,1個用于處理資料
        val conf = new SparkConf().setMaster("local[8]").setAppName("SparkSql")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        // 這裡是每隔3秒進行一次資料處理
        val ssc = new StreamingContext(sc, Seconds(3))
        // 隻會讀取在監聽期間傳入監聽檔案夾的檔案
        // 并且該檔案還必須在開始監聽之後進行修改過
        val lines = ssc.textFileStream("resources/data/SparkStreaming.SparkStreaming/wordCount.txt")
        val words = lines.flatMap(_.split(","))
        val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCount.print()
        ssc.start()
        ssc.awaitTermination()
    }
           

這裡有些地方是需要注意:

1.SparkContext至少要啟動2個線程以上,1個用于監聽,1個用于處理資料

2.對檔案監聽我個人認為不是很人性化:隻會讀取在監聽期間傳入監聽檔案夾的檔案;并且該檔案還必須在開始監聽之後進行修改過

對socket端口的監聽

首先,我們需要先建立一個scoket服務端口,定時往裡面寫入内容,我們的SparkStreaming才能進行監聽并實時接受資料

import java.io.PrintWriter
import java.net.ServerSocket

import scala.io.Source

/**
  * 建立一個socket服務,間隔一定時間從檔案中随機讀取一行内容
  */
object SocketServer {
    private val rd = new java.util.Random()

    def rdInt(max: Int): Int ={
        rd.nextInt(max)
    }

    def main(args: Array[String]): Unit = {
        val fileName = args(0) // 讀取的檔案路徑
        val port = args(1).toInt // socket端口号
        val interval = args(2).toLong // 讀取檔案内容的時間間隔:毫秒

        val reader = Source.fromFile(fileName)
        val lines = reader.getLines().toList
        reader.close()
        val length = lines.length
        val listener = new ServerSocket(port)
        while (true){ // 一直監聽該socket端口
            val socket = listener.accept()
            new Thread(){
                override def run = {
                    val out = new PrintWriter(socket.getOutputStream, true)
                    while (true){
                        Thread.sleep(interval)
                        val content = lines(rdInt(length))
                        println(content)
                        out.write(content + "\n")
                        out.flush()
                    }
                    socket.close()
                }
            }.start()
        }
    }
}
           

接下來,就是我們SparkStreaming的監聽代碼

def socketStreaming(): Unit ={
        /**
          * 監聽socket端口的寫入内容
          */
        val conf = new SparkConf().setMaster("local[4]").setAppName("SparkSql")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        // 這裡是每隔3秒進行一次資料處理
        val ssc = new StreamingContext(sc, Seconds(3))
        val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(","))
        val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCount.print()
        ssc.start()
        ssc.awaitTermination()
    }
           

完整的代碼我已經上傳至GitHub

歡迎關注同名公衆号:“我就算餓死也不做程式員”。

交個朋友,一起交流,一起學習,一起進步。

Spark學習筆記(三):SparkStreaming實作對檔案夾和socket的監聽對檔案夾的監聽對socket端口的監聽

繼續閱讀