天天看点

大数据系列之Spark Streaming

Spark Streaming是构建在Spark上的实时流计算框架,可以进行实时流数据处理。本文简要介绍了Spark Streaming的基本概念和基本算子的使用。

1、Spark Streaming介绍

Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以在快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。Spark Streaming的优势在于:

  1. 能运行在100+的结点上,并达到秒级延迟。
  2. 使用基于内存的Spark作为执行引擎,具有高效和容错的特性。
  3. 能集成Spark的批处理和交互查询。
  4. 为实现复杂的算法提供和批处理类似的简单接口。
    大数据系列之Spark Streaming

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次,新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的,批次间隔一般设在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。

大数据系列之Spark Streaming

从原理上看,把传统的spark批处理程序变成streaming程序,spark需要构建四个内容:

  1. 一个静态的RDD DAG的模板,来表示处理逻辑;
  2. 一个动态的工作控制器,将连续的streaming data切分数据片段,并按照模板复制出新的RDD DAG的实例,对数据片段进行处理;
  3. Receiver进行原始数据的产生和导入;Receiver将接收到的数据合并为数据块并存到内存或硬盘中,供后续batch RDD进行消费;
  4. 对长时运行任务的保障,包括输入数据的失效后的重构,处理任务的失败后的重调。
大数据系列之Spark Streaming

下图提供了Spark的driver、workers、流数据源和目标之间的数据流:

大数据系列之Spark Streaming

从Spark Streaming Context开始,由图中的ssc.start()表示:

  1. 当Spark Streaming Context启动时,driver分配给executor(即workers)一个长期任务
  2. Executor(本图中的Executor1)上的接收者从流式源接收数据流。随着输入的数据流,接收器将流分成块并将这些块保存在内存中
  3. 这些数据块也被复制到另一个Executor,以避免数据丢失
  4. 数据块的 ID 信息 被传送到 driver 中的 Block Management Master上
  5. 对于Spark Streaming Context中配置的每个批处理间隔(通常这是每1秒),driver将启动Spark任务处理块。然后将这些块保存到任意数量的目标数据存储中,包括云存储(例如S3/WASB等),关系数据存储(例如MySQL,PostgreSQL等),以及NoSQL存储

2、Sample程序-Wordcount

1)Import StreamingContext,创建本地StreamingContext,包括2个线程,批量间隔为1s

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
           

2)创建DStream,TCP源数据作为数据流,指定hostname和port

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
           

3)对DStream中的记录进行分词处理

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
- flatMap:一对多的DStream操作,从源DStream记录中分成多个DStream记录

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
- map(lambda word: (word, 1)):生成(word, 1)的DStream对

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
           

4)开始处理操作

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate
           

5)开两个终端,其中一个输入网络命令输入词组:

[[email protected] ipynotebook]# nc -lk 9999
           

6)另一个终端执行脚本如下:

[[email protected] spark-2.3.0]# spark-submit /usr/local/spark/ipynotebook/02-Streaming-01.py localhost 9999
           

7)输出结果如下:

大数据系列之Spark Streaming

3、Spark Streaming操作

3.1 初始化StreamingContext

从SparkContext中创建StreamingContext:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
           
  • Master是Spark、Mesos或YARN或“local[*]”运行在local mode
  • Batch interval是根据应用的延迟设置的
3.2 Input DStream和Receivers

Spark Streaming有两种内置的Streaming源:

  • Basic source:StreamingContext API可用的源,比如文件系统、socket连接
  • Advanced source:比如kafka、flume等

1)File system:HDFS API兼容的任何文件系统,通过以下方式创建DStream:

StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
           

注:Python API中不支持filesystem,只支持textFileStream

streamingContext.textFileStream(dataDirectory)
           

2)Advanced source:需要和外部的non-spark库进行交互,其中像kafka和flume有很复杂的依赖包,如果希望在Spark Shell中使用这些源,则需要将依赖包添加到classpath

大数据系列之Spark Streaming
3.3 DStream的转换操作
  • UpdateStateByKey操作

UpdateStateByKey允许维护任意一种状态,同时不断更新新的信息,需要完成两步:

  1. 定义状态,状态可以是任意的数据类型
  2. 定义状态更新函数,指定函数使用之前的状态和新输入的值更新状态
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

runningCounts = pairs.updateStateByKey(updateFunction)
           
  • Transform操作

允许任意的RDD-RDD函数应用于DStream,例如,可以通过将输入数据流与预先计算的垃圾信息(也可以使用Spark生成)进行实时数据清理,然后基于此进行过滤。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
           
  • Window操作

Spark Streaming支持窗口计算,这样允许在滑动创建进行转换,如下图所示:

大数据系列之Spark Streaming

当窗口滑过源DStream,源RDDs被组合产生窗口DStream的RDDs。Windows需要2个参数:

  1. 窗口长度:窗口持续的时间
  2. 滑动间隔:窗口操作的间隔
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
           
  • Join操作

1)Stream-stream join

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
           

2)Stream-datasets join

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
           
3.4 DStream的Output操作
  • foreachRDD设计模式

最通用的输出操作,将函数func应用于从流中生成的每个RDD。此功能应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。注意,函数func在流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,强制执行流RDD的计算。通过维护一个静态的对象连接池,在多个RDDs/batches之间重用连接,降低消耗:

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
           

注:连接池中的连接按需创建,并且一段时间不使用时会超时,实现了将数据发送到外部系统的最有效方法。

3.5 DataFrame和SQL操作

Streaming data也可以使用DataFrame和SQL操作,首先需要通过StreamingContext正在使用的SparkContext创建SparkSession,只有这样在driver failure时候才能完成重启。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)
           
3.6 Checkpoint机制

Streaming应用为了保证7x24运行,需要对与程序逻辑无关的故障具有恢复能力。为了实现这种可能,Spark Streaming需要对一个fault-tolerant系统checkpoint足够多的信息,这样可以从故障中进行恢复。有两种类型的数据可以checkpoint:

  1. MetaData checkpointing:将流式计算的定义信息保存在容错存储系统如HDFS中,用于从故障节点进行恢复,metadata包括
    1. Configuration:用来创建Streaming应用
    2. DStream操作:Streaming应用中定义的DStream操作集合
    3. Incomplete batches:在队列中尚未完成的batches
  2. Data checkpointing:将生成的RDDs保存在可靠的存储系统中,这对于将多个批次的数据组合在一起的有状态转换是很有必要的。在这样的转换中,生成的RDD依赖于之前批次的RDD,导致依赖链的长度随着时间的推移而不断增加。为了避免恢复时间的这种无限增长(与依赖链成比例),有状态转换的中间RDD被周期性地检查为可靠存储(如HDFS)以切断依赖链

总而言之,metadata checkpointing主要用于从驱动程序故障中恢复,而数据或RDD checkpointing对于使用有状态转换的基本功能是必须的。

  • Checkpointing的时间

当遇到以下情形时,需要使用到checkpoint功能:

  1. 使用了有状态的转换操作,应用中使用到updateStateByKey或reduceByKeyAndWindow,需要提供checkpoint目录以备周期性的checkpoint动作
  2. 从驱动故障中恢复应用,meta checkpoint用于恢复运行信息

    注:没有上述状态转换的简单streaming流应用程序可以启用checkpointing

  • 配置Checkpoint

Checkpointing通过在容错和高可靠性的文件系统中设置目录进行配置,用于保存checkpoint信息。这个功能使用streamingContext.checkpoint(checkpointDirectory)来实现,如下所示:

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)  # new context
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()
           
  • 如果checkpointDirectory已经存在,context会从checkpoint data中重建
  • 如果checkpointDirectory不存在,会调用functionToCreateContext新建一个context
参考资料
  1. http://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://github.com/lw-lin/CoolplaySpark/tree/
  3. https://github.com/apache/spark/tree/v2.3.1/examples/src/main/python/streaming

转载请注明原文地址:https://blog.csdn.net/solihawk/article/details/116420919

文章会同步在公众号“牧羊人的方向”更新,感兴趣的可以关注公众号,谢谢!

大数据系列之Spark Streaming

继续阅读