Spark Streaming工作原理:
Spark Streaming接收實時輸入資料流并将資料分成批,然後由Spark引擎處理,以批量生成最終結果流。
Spark Streaming提供了一個高層抽象Dstream,表示連續的資料流。
DStream可以通過Kafka,Flume和Kinesis等來源的輸入資料流建立,也可以通過在其他DStream上應用進階操作來建立
在内部,DStream表示為RDD序列,所有的DStream操作最終都會轉化成為RDD操作
DStream與RDD類似的基礎屬性:
1.依賴其他的DStream清單
2.生成RDD的時間間隔
3.compute計算函數:用于生成RDD,和RDD的compute類似
Spark Streaming的Maven依賴:
org.apache.spark
spark-streaming_2.11
2.3.1
自Kafka,Flume,Kinesis 中擷取資料源(不支援SparkStreaming的核心API),需要添加對應的擴充依賴spark-streaming-xyz_2.11
初始化StreamingContext
StreamingContext 是所有SparkStreaming 程式的入口
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf,Seconds(batch))
StreamingContext 由SparkConf建立,在非互動式環境下,appName參數是應用程式在叢集UI上顯示的名稱
master是一個Spark、Mesos或YARN cluster URL,或一個在本地模式下運作的特殊“local[*]”字元串
批處理間隔必須根據應用程式和可用叢集資源的延遲要求來設定
在定義上下文之後,
通過建立輸入DStream來定義輸入源,在使用資料源時,應注意資料源的可靠性,比如Kafka,
3種輸入DStream:
1.基本型:由StreamingContext的API直接提供,檔案系統(textFileStream)、Socket、Akka actor等
textFileStream接收Hdfs目錄作為參數,spark實時監控該目錄,以為檔案為機關。
要求:1.格式相同 2.不支援遞歸讀取 3.檔案以原子方式進入 4.檔案隻會被讀取一次,修改的内容不會被重新讀取
2.進階型: 依賴額外的庫實作,Kafka,Flume等,在編譯時需要設定依賴,并打包
3.自定義:使用者自定義并編碼實作
通過Transformation和output操作應用于DStream來定義流式計算。
Transformation操作,類似于RDD的Transformation操作
特殊操作:
1.transform:提供了直接操作DStream内部RDD的方法
2.updateStateByKey:特有的狀态更新方法,使用該方法必須配置檢查點目錄
output操作,類似于RDD的Action操作
開始接收資料并使用它進行處理streamingContext.start()。
等待處理停止(手動或由于任何錯誤)使用streamingContext.awaitTermination()。
處理可以使用手動停止streamingContext.stop()
視窗(window)操作:
任何視窗操作都需要指定兩個參數:1.視窗長度(視窗的持續時間):windowLength 2.滑動間隔(執行視窗操作的時間間隔):slideInterval
常用視窗操作:
window(windowLength,slideInterval)
countByWindow(windowLength,slideInterval)
reduceByWindow(func,windowLength,slideInterval)
reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ])
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks ):僅适用于可逆減少函數,且必須啟動檢查點才可以使用
countByValueAndWindow(windowLength, slideInterval,[ numTasks ])
join操作:
1.stream-stream-join:
相同批次間隔,stream1産生的RDD可以和stream2産生的RDD實作join,視窗也可以
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
2.stream-dataset joins
動态地改變加入的資料集。transform是評估每批間隔,是以将使用目前資料集和資料集參考點
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
Output操作:
将DStream結果輸出至外部系統
print():列印每個RDD的前num成員,預設num=10
saveAsTextFiles(prefix,[suffix]):儲存為文本檔案,或者HDFS;格式為prefix-XXXX[.suffix]
saveAsObjectFiles(prefix,[suffix]):儲存為sequenceFile格式
saveAsHadoopFiles(prefix, [suffix]):儲存為Hadoop檔案
foreachRDD(func):傳入一個函數,直接作用在RDD上
func 是在調用Spark流式計算程式的Driver程序中執行的,但是func一般會調用RDD的Action操作,是在worker中執行的
使用要求:至少要包含一個RDD的Action調用,因為Spark Streaming的排程是由output方法觸發的,每個周期調用一次所有定義的output方法,output内部調用RDD的Action操作完成計算,否則程式隻會接收資料而不會計算
緩存和持久化:持久化級别:記憶體
調用persist()可以把DStream及包含的RDD全部緩存到記憶體中
網絡來源資料(Kafka,Flume等)預設持久化級别是複制到兩個節點,確定容錯
Spark Streaming 容錯:
RDD的容錯機制:通過線性鍊條的形式記錄RDD從建立開始,中間每一步的計算過程,錯誤恢複就是重新計算
Spark 通過具有容錯處理的檔案系統實作資料存儲的容錯,但是不适合Spark Streaming,Spark Streaming的資料會預設存儲兩份,在兩個節點上,是以錯誤時會有兩類資料需要恢複:
1.剛收到已經被緩存,但是還沒有複制到其他節點的資料:在資料源重新擷取
2.收到了且已經複制到其他節點的資料:通過備份節點恢複
可能發生的錯誤類型:
worker節點失效:記憶體中所有的資料都會丢失且無法恢複
Driver節點失效:SparkContext失效,Spark Streaming失效,所有附屬執行節點都會退出,記憶體資料全部丢失
容錯保障:通過資料計算次數定義:
至多一次
到少一次
精準一次:最佳容錯保障
資料接收----> 資料計算 ----> 結果輸出
資料接收容錯:
不同資料源提供不同的容錯保障:HDFS等自帶容錯的檔案系統:精準一次
1.3以後的Kafka Direct API:精準一次
其他的場景:視接收機制是否有确認機制和是否開啟WAL而不同
資料計算:transformation計算:因為RDD的容錯,可以實作精準一次
output容錯:output本身提供至少一次的容錯保障,當輸出為檔案時,重複資料會覆寫,可以實作精準一次,其他場景,需要使用額外方法實作精準一次:事務更新:更新時帶上事務資訊,確定更新隻進行一次,避免重複,實作精準一次
檢查點:
調用有狀态的Transformation操作必須啟動checkpoint功能,updateStateByKey或者reduceByKeyAndWindow
Driver程式自動重新開機時也使用檢查點,啟動後建立streamingContext時檢查下檢查點目錄,存在則恢複,不存在則建立
val ssc = StreamingContext.getOrCreate
使用檢查點會降低吞吐量,增加批次的計算時間
性能調優:
1.減少批處理的時間:資料接收的并發量:分成多個Input DStream,然後再聚合;資料計算的并發量:調整spark.default.parallelism參數; 資料序列化
2.合理的批次間隔時間
3.記憶體調優:DStream持久化級别;清理曆史資料;CMS垃圾回收