天天看点

Spark Streaming 读书笔记

Spark Streaming工作原理:

Spark Streaming接收实时输入数据流并将数据分成批,然后由Spark引擎处理,以批量生成最终结果流。

Spark Streaming提供了一个高层抽象Dstream,表示连续的数据流。

DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建

在内部,DStream表示为RDD序列,所有的DStream操作最终都会转化成为RDD操作

DStream与RDD类似的基础属性:

1.依赖其他的DStream列表

2.生成RDD的时间间隔

3.compute计算函数:用于生成RDD,和RDD的compute类似

Spark Streaming的Maven依赖:

org.apache.spark

spark-streaming_2.11

2.3.1

自Kafka,Flume,Kinesis 中获取数据源(不支持SparkStreaming的核心API),需要添加对应的扩展依赖spark-streaming-xyz_2.11 
           

初始化StreamingContext

StreamingContext 是所有SparkStreaming 程序的入口

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val ssc = new StreamingContext(conf,Seconds(batch))

    StreamingContext 由SparkConf创建,在非交互式环境下,appName参数是应用程序在集群UI上显示的名称
    master是一个Spark、Mesos或YARN cluster URL,或一个在本地模式下运行的特殊“local[*]”字符串

    批处理间隔必须根据应用程序和可用集群资源的延迟要求来设置

    在定义上下文之后,
        通过创建输入DStream来定义输入源,在使用数据源时,应注意数据源的可靠性,比如Kafka,
            3种输入DStream:
                1.基本型:由StreamingContext的API直接提供,文件系统(textFileStream)、Socket、Akka actor等
                    textFileStream接收Hdfs目录作为参数,spark实时监控该目录,以为文件为单位。
                        要求:1.格式相同 2.不支持递归读取 3.文件以原子方式进入 4.文件只会被读取一次,修改的内容不会被重新读取
                2.高级型: 依赖额外的库实现,Kafka,Flume等,在编译时需要设置依赖,并打包
                3.自定义:用户自定义并编码实现
        通过Transformation和output操作应用于DStream来定义流式计算。
            Transformation操作,类似于RDD的Transformation操作
                特殊操作:
                    1.transform:提供了直接操作DStream内部RDD的方法
                    2.updateStateByKey:特有的状态更新方法,使用该方法必须配置检查点目录
            output操作,类似于RDD的Action操作
        开始接收数据并使用它进行处理streamingContext.start()。
        等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()。
        处理可以使用手动停止streamingContext.stop()
窗口(window)操作:
    任何窗口操作都需要指定两个参数:1.窗口长度(窗口的持续时间):windowLength 2.滑动间隔(执行窗口操作的时间间隔):slideInterval
    常用窗口操作:
        window(windowLength,slideInterval)
        countByWindow(windowLength,slideInterval)
        reduceByWindow(func,windowLength,slideInterval)
        reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ])
        reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks ):仅适用于可逆减少函数,且必须启动检查点才可以使用
        countByValueAndWindow(windowLength, slideInterval,[ numTasks ])
join操作:
    1.stream-stream-join:
        相同批次间隔,stream1产生的RDD可以和stream2产生的RDD实现join,窗口也可以
            val windowedStream1 = stream1.window(Seconds(20))
            val windowedStream2 = stream2.window(Minutes(1))
            val joinedStream = windowedStream1.join(windowedStream2)
    2.stream-dataset joins
        动态地改变加入的数据集。transform是评估每批间隔,因此将使用当前数据集和数据集参考点
            val dataset: RDD[String, String] = ...
            val windowedStream = stream.window(Seconds(20))...
            val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
Output操作:
    将DStream结果输出至外部系统
        print():打印每个RDD的前num成员,默认num=10
        saveAsTextFiles(prefix,[suffix]):保存为文本文件,或者HDFS;格式为prefix-XXXX[.suffix]
        saveAsObjectFiles(prefix,[suffix]):保存为sequenceFile格式
        saveAsHadoopFiles(prefix, [suffix]):保存为Hadoop文件
        foreachRDD(func):传入一个函数,直接作用在RDD上
            func 是在调用Spark流式计算程序的Driver进程中执行的,但是func一般会调用RDD的Action操作,是在worker中执行的
            使用要求:至少要包含一个RDD的Action调用,因为Spark Streaming的调度是由output方法触发的,每个周期调用一次所有定义的output方法,output内部调用RDD的Action操作完成计算,否则程序只会接收数据而不会计算
缓存和持久化:持久化级别:内存
    调用persist()可以把DStream及包含的RDD全部缓存到内存中
    网络来源数据(Kafka,Flume等)默认持久化级别是复制到两个节点,确保容错
           

Spark Streaming 容错:

RDD的容错机制:通过线性链条的形式记录RDD从创建开始,中间每一步的计算过程,错误恢复就是重新计算

Spark 通过具有容错处理的文件系统实现数据存储的容错,但是不适合Spark Streaming,Spark Streaming的数据会默认存储两份,在两个节点上,所以错误时会有两类数据需要恢复:

1.刚收到已经被缓存,但是还没有复制到其他节点的数据:在数据源重新获取

2.收到了且已经复制到其他节点的数据:通过备份节点恢复

可能发生的错误类型:

worker节点失效:内存中所有的数据都会丢失且无法恢复

Driver节点失效:SparkContext失效,Spark Streaming失效,所有附属执行节点都会退出,内存数据全部丢失

容错保障:通过数据计算次数定义:

至多一次

到少一次

精准一次:最佳容错保障

数据接收----> 数据计算 ----> 结果输出
数据接收容错:
    不同数据源提供不同的容错保障:HDFS等自带容错的文件系统:精准一次
                                  1.3以后的Kafka Direct API:精准一次
                                  其他的场景:视接收机制是否有确认机制和是否开启WAL而不同
数据计算:transformation计算:因为RDD的容错,可以实现精准一次
output容错:output本身提供至少一次的容错保障,当输出为文件时,重复数据会覆盖,可以实现精准一次,其他场景,需要使用额外方法实现精准一次:事务更新:更新时带上事务信息,确保更新只进行一次,避免重复,实现精准一次
检查点:
    调用有状态的Transformation操作必须启动checkpoint功能,updateStateByKey或者reduceByKeyAndWindow
    Driver程序自动重启时也使用检查点,启动后创建streamingContext时检查下检查点目录,存在则恢复,不存在则新建
        val ssc = StreamingContext.getOrCreate

    使用检查点会降低吞吐量,增加批次的计算时间
           

性能调优:

1.减少批处理的时间:数据接收的并发量:分成多个Input DStream,然后再聚合;数据计算的并发量:调整spark.default.parallelism参数; 数据序列化

2.合理的批次间隔时间

3.内存调优:DStream持久化级别;清理历史数据;CMS垃圾回收

继续阅读