天天看点

Structured Streaming 入门案例之WordCount

1、编写一个流式计算的应用, 不断的接收外部系统的消息
2、对消息中的单词进行词频统计
3、统计全局的结果
           
Structured Streaming 入门案例之WordCount
Structured Streaming 入门案例之WordCount

步骤

  1. Socket Server

    等待

    Structured Streaming

    程序连接
  2. Structured Streaming

    程序启动, 连接

    Socket Server

    , 等待

    Socket Server

    发送数据
  3. Socket Server

    发送数据,

    Structured Streaming

    程序接收数据
  4. Structured Streaming

    程序接收到数据后处理数据
  5. 数据处理后, 生成对应的结果集, 在控制台打印

代码

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}

object StructDemo extends App {
  private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test")
    .getOrCreate()

  import spark.implicits._
  //receive nc data
  //Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
  private val ds: Dataset[String] = spark.readStream.format("socket")
    .option("host", "mypc01")
    .option("port", 10087)
    .load()
    .as[String]
  private val value: KeyValueGroupedDataset[String, (String, Int)] = ds.flatMap((_.split(" "))).map((_, 1)).groupByKey(_._1)
  private val value1: Dataset[(String, Long)] = value.count()
  value1.writeStream
    .outputMode(OutputMode.Complete())
    .format("console")
    .start()
    .awaitTermination()
}
           

换种写法 .sql风格

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object StructDemo2 extends App {
  private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test")
    .getOrCreate()

  import spark.implicits._
  //receive nc data
  //Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.
  private val ds: Dataset[String] = spark.readStream.format("socket")
    .option("host", "mypc01")
    .option("port", 10087)
    .load()
    .as[String]
  ds.flatMap((_.split(" "))).map((_, 1)).toDF("word", "num").createTempView("tmp")
  val sql=
    """
      |select word,count(1)
      |from tmp
      |group by word
      |""".stripMargin
  private val frame: DataFrame = spark.sql(sql)
  frame.writeStream
    .outputMode(OutputMode.Complete())
    .format("console")
    .start()
    .awaitTermination()
}
           

总结

1、Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地

2、Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset

3、Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStream 和 writeStream

4、Structured Streaming 和 SparkSQL 几乎没有区别, 唯一的区别是, readStream 读出来的是流, writeStream 是将流输出, 而 SparkSQL 中的批处理使用 read 和 write

继续阅读