天天看點

Structured Streaming 入門案例之WordCount

1、編寫一個流式計算的應用, 不斷的接收外部系統的消息
2、對消息中的單詞進行詞頻統計
3、統計全局的結果
           
Structured Streaming 入門案例之WordCount
Structured Streaming 入門案例之WordCount

步驟

  1. Socket Server

    等待

    Structured Streaming

    程式連接配接
  2. Structured Streaming

    程式啟動, 連接配接

    Socket Server

    , 等待

    Socket Server

    發送資料
  3. Socket Server

    發送資料,

    Structured Streaming

    程式接收資料
  4. Structured Streaming

    程式接收到資料後處理資料
  5. 資料處理後, 生成對應的結果集, 在控制台列印

代碼

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}

object StructDemo extends App {
  private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test")
    .getOrCreate()

  import spark.implicits._
  //receive nc data
  //Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
  private val ds: Dataset[String] = spark.readStream.format("socket")
    .option("host", "mypc01")
    .option("port", 10087)
    .load()
    .as[String]
  private val value: KeyValueGroupedDataset[String, (String, Int)] = ds.flatMap((_.split(" "))).map((_, 1)).groupByKey(_._1)
  private val value1: Dataset[(String, Long)] = value.count()
  value1.writeStream
    .outputMode(OutputMode.Complete())
    .format("console")
    .start()
    .awaitTermination()
}
           

換種寫法 .sql風格

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object StructDemo2 extends App {
  private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test")
    .getOrCreate()

  import spark.implicits._
  //receive nc data
  //Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
  private val ds: Dataset[String] = spark.readStream.format("socket")
    .option("host", "mypc01")
    .option("port", 10087)
    .load()
    .as[String]
  ds.flatMap((_.split(" "))).map((_, 1)).toDF("word", "num").createTempView("tmp")
  val sql=
    """
      |select word,count(1)
      |from tmp
      |group by word
      |""".stripMargin
  private val frame: DataFrame = spark.sql(sql)
  frame.writeStream
    .outputMode(OutputMode.Complete())
    .format("console")
    .start()
    .awaitTermination()
}
           

總結

1、Structured Streaming 中的程式設計步驟依然是先讀, 後處理, 最後落地

2、Structured Streaming 中的程式設計模型依然是 DataFrame 和 Dataset

3、Structured Streaming 中依然是有外部資料源讀寫架構的, 叫做 readStream 和 writeStream

4、Structured Streaming 和 SparkSQL 幾乎沒有差別, 唯一的差別是, readStream 讀出來的是流, writeStream 是将流輸出, 而 SparkSQL 中的批處理使用 read 和 write

繼續閱讀