天天看點

Spark Streaming 讀書筆記

Spark Streaming工作原理:

Spark Streaming接收實時輸入資料流并将資料分成批,然後由Spark引擎處理,以批量生成最終結果流。

Spark Streaming提供了一個高層抽象Dstream,表示連續的資料流。

DStream可以通過Kafka,Flume和Kinesis等來源的輸入資料流建立,也可以通過在其他DStream上應用進階操作來建立

在内部,DStream表示為RDD序列,所有的DStream操作最終都會轉化成為RDD操作

DStream與RDD類似的基礎屬性:

1.依賴其他的DStream清單

2.生成RDD的時間間隔

3.compute計算函數:用于生成RDD,和RDD的compute類似

Spark Streaming的Maven依賴:

org.apache.spark

spark-streaming_2.11

2.3.1

自Kafka,Flume,Kinesis 中擷取資料源(不支援SparkStreaming的核心API),需要添加對應的擴充依賴spark-streaming-xyz_2.11 
           

初始化StreamingContext

StreamingContext 是所有SparkStreaming 程式的入口

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val ssc = new StreamingContext(conf,Seconds(batch))

    StreamingContext 由SparkConf建立,在非互動式環境下,appName參數是應用程式在叢集UI上顯示的名稱
    master是一個Spark、Mesos或YARN cluster URL,或一個在本地模式下運作的特殊“local[*]”字元串

    批處理間隔必須根據應用程式和可用叢集資源的延遲要求來設定

    在定義上下文之後,
        通過建立輸入DStream來定義輸入源,在使用資料源時,應注意資料源的可靠性,比如Kafka,
            3種輸入DStream:
                1.基本型:由StreamingContext的API直接提供,檔案系統(textFileStream)、Socket、Akka actor等
                    textFileStream接收Hdfs目錄作為參數,spark實時監控該目錄,以為檔案為機關。
                        要求:1.格式相同 2.不支援遞歸讀取 3.檔案以原子方式進入 4.檔案隻會被讀取一次,修改的内容不會被重新讀取
                2.進階型: 依賴額外的庫實作,Kafka,Flume等,在編譯時需要設定依賴,并打包
                3.自定義:使用者自定義并編碼實作
        通過Transformation和output操作應用于DStream來定義流式計算。
            Transformation操作,類似于RDD的Transformation操作
                特殊操作:
                    1.transform:提供了直接操作DStream内部RDD的方法
                    2.updateStateByKey:特有的狀态更新方法,使用該方法必須配置檢查點目錄
            output操作,類似于RDD的Action操作
        開始接收資料并使用它進行處理streamingContext.start()。
        等待處理停止(手動或由于任何錯誤)使用streamingContext.awaitTermination()。
        處理可以使用手動停止streamingContext.stop()
視窗(window)操作:
    任何視窗操作都需要指定兩個參數:1.視窗長度(視窗的持續時間):windowLength 2.滑動間隔(執行視窗操作的時間間隔):slideInterval
    常用視窗操作:
        window(windowLength,slideInterval)
        countByWindow(windowLength,slideInterval)
        reduceByWindow(func,windowLength,slideInterval)
        reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ])
        reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks ):僅适用于可逆減少函數,且必須啟動檢查點才可以使用
        countByValueAndWindow(windowLength, slideInterval,[ numTasks ])
join操作:
    1.stream-stream-join:
        相同批次間隔,stream1産生的RDD可以和stream2産生的RDD實作join,視窗也可以
            val windowedStream1 = stream1.window(Seconds(20))
            val windowedStream2 = stream2.window(Minutes(1))
            val joinedStream = windowedStream1.join(windowedStream2)
    2.stream-dataset joins
        動态地改變加入的資料集。transform是評估每批間隔,是以将使用目前資料集和資料集參考點
            val dataset: RDD[String, String] = ...
            val windowedStream = stream.window(Seconds(20))...
            val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
Output操作:
    将DStream結果輸出至外部系統
        print():列印每個RDD的前num成員,預設num=10
        saveAsTextFiles(prefix,[suffix]):儲存為文本檔案,或者HDFS;格式為prefix-XXXX[.suffix]
        saveAsObjectFiles(prefix,[suffix]):儲存為sequenceFile格式
        saveAsHadoopFiles(prefix, [suffix]):儲存為Hadoop檔案
        foreachRDD(func):傳入一個函數,直接作用在RDD上
            func 是在調用Spark流式計算程式的Driver程序中執行的,但是func一般會調用RDD的Action操作,是在worker中執行的
            使用要求:至少要包含一個RDD的Action調用,因為Spark Streaming的排程是由output方法觸發的,每個周期調用一次所有定義的output方法,output内部調用RDD的Action操作完成計算,否則程式隻會接收資料而不會計算
緩存和持久化:持久化級别:記憶體
    調用persist()可以把DStream及包含的RDD全部緩存到記憶體中
    網絡來源資料(Kafka,Flume等)預設持久化級别是複制到兩個節點,確定容錯
           

Spark Streaming 容錯:

RDD的容錯機制:通過線性鍊條的形式記錄RDD從建立開始,中間每一步的計算過程,錯誤恢複就是重新計算

Spark 通過具有容錯處理的檔案系統實作資料存儲的容錯,但是不适合Spark Streaming,Spark Streaming的資料會預設存儲兩份,在兩個節點上,是以錯誤時會有兩類資料需要恢複:

1.剛收到已經被緩存,但是還沒有複制到其他節點的資料:在資料源重新擷取

2.收到了且已經複制到其他節點的資料:通過備份節點恢複

可能發生的錯誤類型:

worker節點失效:記憶體中所有的資料都會丢失且無法恢複

Driver節點失效:SparkContext失效,Spark Streaming失效,所有附屬執行節點都會退出,記憶體資料全部丢失

容錯保障:通過資料計算次數定義:

至多一次

到少一次

精準一次:最佳容錯保障

資料接收----> 資料計算 ----> 結果輸出
資料接收容錯:
    不同資料源提供不同的容錯保障:HDFS等自帶容錯的檔案系統:精準一次
                                  1.3以後的Kafka Direct API:精準一次
                                  其他的場景:視接收機制是否有确認機制和是否開啟WAL而不同
資料計算:transformation計算:因為RDD的容錯,可以實作精準一次
output容錯:output本身提供至少一次的容錯保障,當輸出為檔案時,重複資料會覆寫,可以實作精準一次,其他場景,需要使用額外方法實作精準一次:事務更新:更新時帶上事務資訊,確定更新隻進行一次,避免重複,實作精準一次
檢查點:
    調用有狀态的Transformation操作必須啟動checkpoint功能,updateStateByKey或者reduceByKeyAndWindow
    Driver程式自動重新開機時也使用檢查點,啟動後建立streamingContext時檢查下檢查點目錄,存在則恢複,不存在則建立
        val ssc = StreamingContext.getOrCreate

    使用檢查點會降低吞吐量,增加批次的計算時間
           

性能調優:

1.減少批處理的時間:資料接收的并發量:分成多個Input DStream,然後再聚合;資料計算的并發量:調整spark.default.parallelism參數; 資料序列化

2.合理的批次間隔時間

3.記憶體調優:DStream持久化級别;清理曆史資料;CMS垃圾回收

繼續閱讀