SparkStreaming是Spark的一個流式計算架構,它支援對許多資料源進行實時監聽,例如Kafka, Flume, Kinesis, TCP sockets,甚至檔案夾,并實作實時計算的能力。
對檔案夾的監聽
def fileStreaming(): Unit ={
/**
* 監聽檔案夾的新增檔案内容
*/
// 至少要啟動2個線程以上,1個用于監聽,1個用于處理資料
val conf = new SparkConf().setMaster("local[8]").setAppName("SparkSql")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 這裡是每隔3秒進行一次資料處理
val ssc = new StreamingContext(sc, Seconds(3))
// 隻會讀取在監聽期間傳入監聽檔案夾的檔案
// 并且該檔案還必須在開始監聽之後進行修改過
val lines = ssc.textFileStream("resources/data/SparkStreaming.SparkStreaming/wordCount.txt")
val words = lines.flatMap(_.split(","))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
這裡有些地方是需要注意:
1.SparkContext至少要啟動2個線程以上,1個用于監聽,1個用于處理資料
2.對檔案監聽我個人認為不是很人性化:隻會讀取在監聽期間傳入監聽檔案夾的檔案;并且該檔案還必須在開始監聽之後進行修改過
對socket端口的監聽
首先,我們需要先建立一個scoket服務端口,定時往裡面寫入内容,我們的SparkStreaming才能進行監聽并實時接受資料
import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source
/**
* 建立一個socket服務,間隔一定時間從檔案中随機讀取一行内容
*/
object SocketServer {
private val rd = new java.util.Random()
def rdInt(max: Int): Int ={
rd.nextInt(max)
}
def main(args: Array[String]): Unit = {
val fileName = args(0) // 讀取的檔案路徑
val port = args(1).toInt // socket端口号
val interval = args(2).toLong // 讀取檔案内容的時間間隔:毫秒
val reader = Source.fromFile(fileName)
val lines = reader.getLines().toList
reader.close()
val length = lines.length
val listener = new ServerSocket(port)
while (true){ // 一直監聽該socket端口
val socket = listener.accept()
new Thread(){
override def run = {
val out = new PrintWriter(socket.getOutputStream, true)
while (true){
Thread.sleep(interval)
val content = lines(rdInt(length))
println(content)
out.write(content + "\n")
out.flush()
}
socket.close()
}
}.start()
}
}
}
接下來,就是我們SparkStreaming的監聽代碼
def socketStreaming(): Unit ={
/**
* 監聽socket端口的寫入内容
*/
val conf = new SparkConf().setMaster("local[4]").setAppName("SparkSql")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 這裡是每隔3秒進行一次資料處理
val ssc = new StreamingContext(sc, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(","))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
完整的代碼我已經上傳至GitHub
歡迎關注同名公衆号:“我就算餓死也不做程式員”。
交個朋友,一起交流,一起學習,一起進步。