Spark Streaming是构建在Spark上的实时流计算框架,可以进行实时流数据处理。本文简要介绍了Spark Streaming的基本概念和基本算子的使用。
1、Spark Streaming介绍
Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以在快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。Spark Streaming的优势在于:
- 能运行在100+的结点上,并达到秒级延迟。
- 使用基于内存的Spark作为执行引擎,具有高效和容错的特性。
- 能集成Spark的批处理和交互查询。
- 为实现复杂的算法提供和批处理类似的简单接口。
Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次,新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的,批次间隔一般设在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。
从原理上看,把传统的spark批处理程序变成streaming程序,spark需要构建四个内容:
- 一个静态的RDD DAG的模板,来表示处理逻辑;
- 一个动态的工作控制器,将连续的streaming data切分数据片段,并按照模板复制出新的RDD DAG的实例,对数据片段进行处理;
- Receiver进行原始数据的产生和导入;Receiver将接收到的数据合并为数据块并存到内存或硬盘中,供后续batch RDD进行消费;
- 对长时运行任务的保障,包括输入数据的失效后的重构,处理任务的失败后的重调。
下图提供了Spark的driver、workers、流数据源和目标之间的数据流:
从Spark Streaming Context开始,由图中的ssc.start()表示:
- 当Spark Streaming Context启动时,driver分配给executor(即workers)一个长期任务
- Executor(本图中的Executor1)上的接收者从流式源接收数据流。随着输入的数据流,接收器将流分成块并将这些块保存在内存中
- 这些数据块也被复制到另一个Executor,以避免数据丢失
- 数据块的 ID 信息 被传送到 driver 中的 Block Management Master上
- 对于Spark Streaming Context中配置的每个批处理间隔(通常这是每1秒),driver将启动Spark任务处理块。然后将这些块保存到任意数量的目标数据存储中,包括云存储(例如S3/WASB等),关系数据存储(例如MySQL,PostgreSQL等),以及NoSQL存储
2、Sample程序-Wordcount
1)Import StreamingContext,创建本地StreamingContext,包括2个线程,批量间隔为1s
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
2)创建DStream,TCP源数据作为数据流,指定hostname和port
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
3)对DStream中的记录进行分词处理
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
- flatMap:一对多的DStream操作,从源DStream记录中分成多个DStream记录
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
- map(lambda word: (word, 1)):生成(word, 1)的DStream对
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
4)开始处理操作
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
5)开两个终端,其中一个输入网络命令输入词组:
[[email protected] ipynotebook]# nc -lk 9999
6)另一个终端执行脚本如下:
[[email protected] spark-2.3.0]# spark-submit /usr/local/spark/ipynotebook/02-Streaming-01.py localhost 9999
7)输出结果如下:
3、Spark Streaming操作
3.1 初始化StreamingContext
从SparkContext中创建StreamingContext:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
- Master是Spark、Mesos或YARN或“local[*]”运行在local mode
- Batch interval是根据应用的延迟设置的
3.2 Input DStream和Receivers
Spark Streaming有两种内置的Streaming源:
- Basic source:StreamingContext API可用的源,比如文件系统、socket连接
- Advanced source:比如kafka、flume等
1)File system:HDFS API兼容的任何文件系统,通过以下方式创建DStream:
StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
注:Python API中不支持filesystem,只支持textFileStream
streamingContext.textFileStream(dataDirectory)
2)Advanced source:需要和外部的non-spark库进行交互,其中像kafka和flume有很复杂的依赖包,如果希望在Spark Shell中使用这些源,则需要将依赖包添加到classpath
3.3 DStream的转换操作
- UpdateStateByKey操作
UpdateStateByKey允许维护任意一种状态,同时不断更新新的信息,需要完成两步:
- 定义状态,状态可以是任意的数据类型
- 定义状态更新函数,指定函数使用之前的状态和新输入的值更新状态
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
runningCounts = pairs.updateStateByKey(updateFunction)
- Transform操作
允许任意的RDD-RDD函数应用于DStream,例如,可以通过将输入数据流与预先计算的垃圾信息(也可以使用Spark生成)进行实时数据清理,然后基于此进行过滤。
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
- Window操作
Spark Streaming支持窗口计算,这样允许在滑动创建进行转换,如下图所示:
当窗口滑过源DStream,源RDDs被组合产生窗口DStream的RDDs。Windows需要2个参数:
- 窗口长度:窗口持续的时间
- 滑动间隔:窗口操作的间隔
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
- Join操作
1)Stream-stream join
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
2)Stream-datasets join
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
3.4 DStream的Output操作
- foreachRDD设计模式
最通用的输出操作,将函数func应用于从流中生成的每个RDD。此功能应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。注意,函数func在流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,强制执行流RDD的计算。通过维护一个静态的对象连接池,在多个RDDs/batches之间重用连接,降低消耗:
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
注:连接池中的连接按需创建,并且一段时间不使用时会超时,实现了将数据发送到外部系统的最有效方法。
3.5 DataFrame和SQL操作
Streaming data也可以使用DataFrame和SQL操作,首先需要通过StreamingContext正在使用的SparkContext创建SparkSession,只有这样在driver failure时候才能完成重启。
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
3.6 Checkpoint机制
Streaming应用为了保证7x24运行,需要对与程序逻辑无关的故障具有恢复能力。为了实现这种可能,Spark Streaming需要对一个fault-tolerant系统checkpoint足够多的信息,这样可以从故障中进行恢复。有两种类型的数据可以checkpoint:
- MetaData checkpointing:将流式计算的定义信息保存在容错存储系统如HDFS中,用于从故障节点进行恢复,metadata包括
- Configuration:用来创建Streaming应用
- DStream操作:Streaming应用中定义的DStream操作集合
- Incomplete batches:在队列中尚未完成的batches
- Data checkpointing:将生成的RDDs保存在可靠的存储系统中,这对于将多个批次的数据组合在一起的有状态转换是很有必要的。在这样的转换中,生成的RDD依赖于之前批次的RDD,导致依赖链的长度随着时间的推移而不断增加。为了避免恢复时间的这种无限增长(与依赖链成比例),有状态转换的中间RDD被周期性地检查为可靠存储(如HDFS)以切断依赖链
总而言之,metadata checkpointing主要用于从驱动程序故障中恢复,而数据或RDD checkpointing对于使用有状态转换的基本功能是必须的。
- Checkpointing的时间
当遇到以下情形时,需要使用到checkpoint功能:
- 使用了有状态的转换操作,应用中使用到updateStateByKey或reduceByKeyAndWindow,需要提供checkpoint目录以备周期性的checkpoint动作
-
从驱动故障中恢复应用,meta checkpoint用于恢复运行信息
注:没有上述状态转换的简单streaming流应用程序可以启用checkpointing
- 配置Checkpoint
Checkpointing通过在容错和高可靠性的文件系统中设置目录进行配置,用于保存checkpoint信息。这个功能使用streamingContext.checkpoint(checkpointDirectory)来实现,如下所示:
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...
# Start the context
context.start()
context.awaitTermination()
- 如果checkpointDirectory已经存在,context会从checkpoint data中重建
- 如果checkpointDirectory不存在,会调用functionToCreateContext新建一个context
参考资料
- http://spark.apache.org/docs/latest/streaming-programming-guide.html
- https://github.com/lw-lin/CoolplaySpark/tree/
- https://github.com/apache/spark/tree/v2.3.1/examples/src/main/python/streaming
转载请注明原文地址:https://blog.csdn.net/solihawk/article/details/116420919
文章会同步在公众号“牧羊人的方向”更新,感兴趣的可以关注公众号,谢谢!