一、输入数据源
1. 文件输入数据源(FIie)
file数据源提供了很多种内置的格式,如csv、parquet、orc、json等等,就以csv为例:
import spark.implicits._
val userSchema = new StructType()
.add("name", "string").add("age", "integer")
val lines = spark.readStream
.option("sep", ";")
.schema(userSchema)
.csv("file:///data/*")
val query = lines.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
在对应的目录下新建文件时,就可以在控制台看到对应的数据了。
还有一些其他可以控制的参数:
maxFilesPerTrigger | 每个batch最多的文件数,默认是没有限制。比如我设置了这个值为1,那么同时增加了5个文件,这5个文件会每个文件作为一波数据,更新streaming dataframe。 |
latestFirst | 是否优先处理最新的文件,默认是false。如果设置为true,那么最近被更新的会优先处理。这种场景一般是在监听日志文件的时候使用。 |
fileNameOnly | 是否只监听固定名称的文件 |
2.网络输入数据源(socket)
一般都是基于这个socket来做测试。首先开启一个socket服务器(
nc -lk 9999
)
,然后streaming这边连接进行处理。
spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
3. 输入数据源(kafka)
// Subscribe to 1 topic
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
// Subscribe to multiple topics
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
// Subscribe to a pattern
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribePattern","topic.*")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
以批的形式查询
关于Kafka的offset,structured streaming默认提供了几种方式:
//设置每个分区的起始和结束值
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
//配置起始和结束的offset值(默认)
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Schema信息
读取后的数据的Schema是固定的,包含的列如下:
Column | Type | 说明 |
key | binary | 信息key |
value | 信息的value(我们自己的数据) | |
topic | string | 主题 |
partition | int | 分区 |
offset | long | 偏移值 |
timestamp | 时间戳 | |
timestampType | 类型 |
source相关的配置
无论是流的形式,还是批的形式,都需要一些必要的参数:
- kafka.bootstrap.servers kafka的服务器配置,host:post形式,用逗号进行分割,如host1:9000,host2:9000
- assign,以json的形式指定topic信息
- subscribe,通过逗号分隔,指定topic信息
-
subscribePattern,通过java的正则指定多个topic
assign、subscribe、subscribePattern同时之中能使用一个。
其他比较重要的参数有:
- startingOffsets, offset开始的值,如果是earliest,则从最早的数据开始读;如果是latest,则从最新的数据开始读。默认流是latest,批是earliest
- endingOffsets,最大的offset,只在批处理的时候设置,如果是latest则为最新的数据
- failOnDataLoss,在流处理时,当数据丢失时(比如topic被删除了,offset在指定的范围之外),查询是否报错,默认为true。这个功能可以当做是一种告警机制,如果对丢失数据不感兴趣,可以设置为false。在批处理时,这个值总是为true。
- kafkaConsumer.pollTimeoutMs,excutor连接kafka的超时时间,默认是512ms
- fetchOffset.numRetries,获取kafka的offset信息时,尝试的次数;默认是3次
- fetchOffset.retryIntervalMs,尝试重新读取kafka offset信息时等待的时间,默认是10ms
- maxOffsetsPerTrigger,trigger暂时不会用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
二、输出数据源
目前Structed Streaming有四种方式:
1.File sink。写入到文件中。
2.Foreach sink。对输出的记录进行任意计算。比如保存到mysql中。目前spark不支持直接写入外部数据库,只提供了Foreach接收器自己来实现,而且官网也没有示例代码。
3.Console sink。输出到控制台,仅用于测试。
4.Memory sink。以表的形式输出到内存,spark可以读取内存中的表,仅用于测试。
5.Kafka sink。spark2.2.1更新了kafka sink,所以可以直接使用,如果你的版本低于2.2.1,那就只能使用第二个方法foreach sink来实现。
在配置完输入,并针对DataFrame或者DataSet做了一些操作后,想要把结果保存起来。就可以使用DataSet.writeStream()方法,配置输出需要配置下面的内容:
- format : 配置输出的格式
- output mode:输出的格式
- query name:查询的名称,类似tempview的名字
- trigger interval:触发的间隔时间,如果前一个batch处理超时了,那么不会立即执行下一个batch,而是等下一个trigger时间在执行。
- checkpoint location:为保证数据的可靠性,可以设置检查点保存输出的结果。
1. output Mode
只有三种类型
- complete,把所有的DataFrame的内容输出,这种模式只能在做agg聚合操作的时候使用,比如ds.group.count,之后可以使用它
- append,普通的dataframe在做完map或者filter之后可以使用。这种模式会把新的batch的数据输出出来,
- update,把此次新增的数据输出,并更新整个dataframe。有点类似之前的streaming的state处理。
2. 输出的类型
2.1)file:保存成csv或者parquet
DF.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
2.2)console:直接输出到控制台。一般做测试的时候用这个比较方便(测试用)
DF.writeStream
.format("console")
.start()
2.3)memory:可以保存在内容,供后面的代码使用(测试用)
DF.writeStream
.queryName("aggregates")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
2.4) kafka: 输出到kafka, 在spark 2.2.1以前用自定义实现写入。在spark2.2.1后提供了方法。
spark 2.2.1之前写入kafka的方法
自定义一个类KafkaSink继承ForeachWriter
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{ForeachWriter, Row}
class KafkaSink(topic: String, servers: String) extends ForeachWriter[Row]{
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
val results = new scala.collection.mutable.HashMap
var producer: KafkaProducer[String, String] = _
override def open(partitionId: Long, version: Long): Boolean = {
producer = new KafkaProducer(kafkaProperties)
return true
}
override def process(value: Row): Unit = {
val word = value.getAs[String]("word")
val count = value.getAs[String]("count")
producer.send(new ProducerRecord(topic, word, count))
}
override def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}
spark 2.2.1以后写入kafka的方法
// spark 2.2.1以后
wordcount.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "wordcount")
.start()
2.5)foreach:参数是一个foreach的方法,用户可以实现这个方法实现写入mysql自定义的功能。
import java.sql._
import org.apache.spark.sql.{ForeachWriter, Row}
class JDBCSink(url: String, userName: String, password: String) extends ForeachWriter[Row]{
var statement: Statement = _
var resultSet: ResultSet = _
var connection: Connection = _
// 初始化信息
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection(url, userName, password)
statement = connection.createStatement()
return true
}
// 执行操作
override def process(value: Row): Unit = {
val word= value.getAs[String]("word")
val count = value.getAs[Integer]("count")
val insertSql = "insert into webCount(word,count)" +
"values('" + word + "'," + count + ")"
statement.execute(insertSql)
}
// 结束操作
override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
object KafkaStructedStreaming {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
val df = sparkSession
.readStream
.format("socket")
.option("host", "hadoop102")
.option("port", "9999")
.load()
import sparkSession.implicits._
val lines = df.selectExpr("CAST(value as STRING)").as[String]
val weblog = lines.as[String].flatMap(_.split(" "))
val wordCount = weblog.groupBy("value").count().toDF("word", "count")
val url ="jdbc:mysql://hadoop102:3306/test"
val username="root"
val password="000000"
val writer = new JDBCSink(url, username, password)
val query = wordCount.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
}
参考原文链接:https://blog.csdn.net/a790439710/article/details/103027602
我不生产知识 我只是知识的搬运工