天天看点

Structured Streaming系列——输入与输出

一、输入数据源

1. 文件输入数据源(FIie)

file数据源提供了很多种内置的格式,如csv、parquet、orc、json等等,就以csv为例:

import spark.implicits._
    val userSchema = new StructType()
       .add("name", "string").add("age", "integer")
    val lines = spark.readStream
      .option("sep", ";")
      .schema(userSchema)
      .csv("file:///data/*")

    val query = lines.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()      

在对应的目录下新建文件时,就可以在控制台看到对应的数据了。

还有一些其他可以控制的参数:

maxFilesPerTrigger  每个batch最多的文件数,默认是没有限制。比如我设置了这个值为1,那么同时增加了5个文件,这5个文件会每个文件作为一波数据,更新streaming dataframe。
latestFirst  是否优先处理最新的文件,默认是false。如果设置为true,那么最近被更新的会优先处理。这种场景一般是在监听日志文件的时候使用。
fileNameOnly  是否只监听固定名称的文件

2.网络输入数据源(socket)

一般都是基于这个socket来做测试。首先开启一个socket服务器(

nc -lk 9999

)

,然后streaming这边连接进行处理。

spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()      

3. 输入数据源(kafka)

// Subscribe to 1 topic
val df= spark                                                                                                                
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

// Subscribe to multiple topics
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

// Subscribe to a pattern
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribePattern","topic.*")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]      

以批的形式查询

关于Kafka的offset,structured streaming默认提供了几种方式:

//设置每个分区的起始和结束值
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

//配置起始和结束的offset值(默认)
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]      

Schema信息

读取后的数据的Schema是固定的,包含的列如下:

Column Type 说明
key binary 信息key
value 信息的value(我们自己的数据)
topic string 主题
partition int 分区
offset long 偏移值
timestamp 时间戳
timestampType 类型

source相关的配置

无论是流的形式,还是批的形式,都需要一些必要的参数:

  • kafka.bootstrap.servers kafka的服务器配置,host:post形式,用逗号进行分割,如host1:9000,host2:9000
  • assign,以json的形式指定topic信息
  • subscribe,通过逗号分隔,指定topic信息
  • subscribePattern,通过java的正则指定多个topic

    assign、subscribe、subscribePattern同时之中能使用一个。

其他比较重要的参数有:

    • startingOffsets, offset开始的值,如果是earliest,则从最早的数据开始读;如果是latest,则从最新的数据开始读。默认流是latest,批是earliest
    • endingOffsets,最大的offset,只在批处理的时候设置,如果是latest则为最新的数据
    • failOnDataLoss,在流处理时,当数据丢失时(比如topic被删除了,offset在指定的范围之外),查询是否报错,默认为true。这个功能可以当做是一种告警机制,如果对丢失数据不感兴趣,可以设置为false。在批处理时,这个值总是为true。
    • kafkaConsumer.pollTimeoutMs,excutor连接kafka的超时时间,默认是512ms
    • fetchOffset.numRetries,获取kafka的offset信息时,尝试的次数;默认是3次
    • fetchOffset.retryIntervalMs,尝试重新读取kafka offset信息时等待的时间,默认是10ms
    • maxOffsetsPerTrigger,trigger暂时不会用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

二、输出数据源

目前Structed Streaming有四种方式:

1.File sink。写入到文件中。

2.Foreach sink。对输出的记录进行任意计算。比如保存到mysql中。目前spark不支持直接写入外部数据库,只提供了Foreach接收器自己来实现,而且官网也没有示例代码。

3.Console sink。输出到控制台,仅用于测试。

4.Memory sink。以表的形式输出到内存,spark可以读取内存中的表,仅用于测试。

5.Kafka sink。spark2.2.1更新了kafka sink,所以可以直接使用,如果你的版本低于2.2.1,那就只能使用第二个方法foreach sink来实现。

在配置完输入,并针对DataFrame或者DataSet做了一些操作后,想要把结果保存起来。就可以使用DataSet.writeStream()方法,配置输出需要配置下面的内容:

  • format : 配置输出的格式
  • output mode:输出的格式
  • query name:查询的名称,类似tempview的名字
  • trigger interval:触发的间隔时间,如果前一个batch处理超时了,那么不会立即执行下一个batch,而是等下一个trigger时间在执行。
  • checkpoint location:为保证数据的可靠性,可以设置检查点保存输出的结果。

1. output Mode

只有三种类型

  • complete,把所有的DataFrame的内容输出,这种模式只能在做agg聚合操作的时候使用,比如ds.group.count,之后可以使用它
  • append,普通的dataframe在做完map或者filter之后可以使用。这种模式会把新的batch的数据输出出来,
  • update,把此次新增的数据输出,并更新整个dataframe。有点类似之前的streaming的state处理。

2. 输出的类型

2.1)file:保存成csv或者parquet

DF.writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()      

2.2)console:直接输出到控制台。一般做测试的时候用这个比较方便(测试用)

DF.writeStream
  .format("console")
  .start()      

2.3)memory:可以保存在内容,供后面的代码使用(测试用)

DF.writeStream
  .queryName("aggregates")
  .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()        

2.4) kafka: 输出到kafka, 在spark 2.2.1以前用自定义实现写入。在spark2.2.1后提供了方法。

spark 2.2.1之前写入kafka的方法

自定义一个类KafkaSink继承ForeachWriter

import java.util.Properties
 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{ForeachWriter, Row}
 
class KafkaSink(topic: String, servers: String) extends ForeachWriter[Row]{
  val kafkaProperties = new Properties()
  kafkaProperties.put("bootstrap.servers", servers)
  kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
  kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
 
  val results = new scala.collection.mutable.HashMap
  var producer: KafkaProducer[String, String] = _
 
  override def open(partitionId: Long, version: Long): Boolean = {
    producer = new KafkaProducer(kafkaProperties)
    return true
  }
 
  override def process(value: Row): Unit = {
    val word = value.getAs[String]("word")
    val count = value.getAs[String]("count")
    producer.send(new ProducerRecord(topic, word, count))
  }
 
  override def close(errorOrNull: Throwable): Unit = {
    producer.close()
  }
}      

spark 2.2.1以后写入kafka的方法

// spark 2.2.1以后
wordcount.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "wordcount")
.start()      

2.5)foreach:参数是一个foreach的方法,用户可以实现这个方法实现写入mysql自定义的功能。

import java.sql._
 
import org.apache.spark.sql.{ForeachWriter, Row}
 
class JDBCSink(url: String, userName: String, password: String) extends ForeachWriter[Row]{
 
  var statement: Statement = _
  var resultSet: ResultSet = _
  var connection: Connection = _
  // 初始化信息
  override def open(partitionId: Long, version: Long): Boolean = {
    
    Class.forName("com.mysql.jdbc.Driver")
    connection = DriverManager.getConnection(url, userName, password)
    statement = connection.createStatement()
    return true
  }
   // 执行操作
  override def process(value: Row): Unit = {
 
    val word= value.getAs[String]("word")
    val count = value.getAs[Integer]("count")
 
 
    val insertSql = "insert into webCount(word,count)" +
      "values('" + word + "'," + count + ")"
 
    statement.execute(insertSql)
  }
  // 结束操作
  override def close(errorOrNull: Throwable): Unit = {
      connection.close()
  }
}      
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
 
object KafkaStructedStreaming {
 
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
 
    val df = sparkSession
        .readStream
        .format("socket")
        .option("host", "hadoop102")
        .option("port", "9999")
        .load()
 
    import sparkSession.implicits._
    val lines = df.selectExpr("CAST(value as STRING)").as[String]
    val weblog = lines.as[String].flatMap(_.split(" "))
 
    val wordCount = weblog.groupBy("value").count().toDF("word", "count")
 
    val url ="jdbc:mysql://hadoop102:3306/test"
    val username="root"
    val password="000000"
 
    val writer = new JDBCSink(url, username, password)
 
    val query = wordCount.writeStream
        .foreach(writer)
        .outputMode("update")
        .trigger(ProcessingTime("10 seconds"))
        .start()
    query.awaitTermination()
}      

参考原文链接:https://blog.csdn.net/a790439710/article/details/103027602

我不生产知识 我只是知识的搬运工