1、編寫一個流式計算的應用, 不斷的接收外部系統的消息
2、對消息中的單詞進行詞頻統計
3、統計全局的結果
步驟
-
等待Socket Server
程式連接配接Structured Streaming
-
程式啟動, 連接配接Structured Streaming
, 等待Socket Server
發送資料Socket Server
-
發送資料,Socket Server
程式接收資料Structured Streaming
-
程式接收到資料後處理資料Structured Streaming
- 資料處理後, 生成對應的結果集, 在控制台列印
代碼
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}
object StructDemo extends App {
private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test")
.getOrCreate()
import spark.implicits._
//receive nc data
//Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
private val ds: Dataset[String] = spark.readStream.format("socket")
.option("host", "mypc01")
.option("port", 10087)
.load()
.as[String]
private val value: KeyValueGroupedDataset[String, (String, Int)] = ds.flatMap((_.split(" "))).map((_, 1)).groupByKey(_._1)
private val value1: Dataset[(String, Long)] = value.count()
value1.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
.awaitTermination()
}
換種寫法 .sql風格
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object StructDemo2 extends App {
private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test")
.getOrCreate()
import spark.implicits._
//receive nc data
//Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
private val ds: Dataset[String] = spark.readStream.format("socket")
.option("host", "mypc01")
.option("port", 10087)
.load()
.as[String]
ds.flatMap((_.split(" "))).map((_, 1)).toDF("word", "num").createTempView("tmp")
val sql=
"""
|select word,count(1)
|from tmp
|group by word
|""".stripMargin
private val frame: DataFrame = spark.sql(sql)
frame.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
.awaitTermination()
}
總結
1、Structured Streaming 中的程式設計步驟依然是先讀, 後處理, 最後落地
2、Structured Streaming 中的程式設計模型依然是 DataFrame 和 Dataset
3、Structured Streaming 中依然是有外部資料源讀寫架構的, 叫做 readStream 和 writeStream
4、Structured Streaming 和 SparkSQL 幾乎沒有差別, 唯一的差別是, readStream 讀出來的是流, writeStream 是将流輸出, 而 SparkSQL 中的批處理使用 read 和 write