這次我們介紹spark streaming,今天主要是原理和相關的操作
- Spark Streaming概念介紹
- Spark Streaming的相關操作
1. Spark Streaming概念
1.1什麼是Spark Streaming
Spark Streaming類似于Apache Storm,用于流式資料的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支援的資料源有很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。資料輸入後可以用Spark的高度抽象操作如:map、reduce、join、window等進行運算。而結果也能儲存在很多地方,如HDFS,資料庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
1.2 Spark Streaming的特點
- 易用,可以像編寫離線批處理一樣去編寫流式程式,支援java/scala/python語言。
- 容錯,SparkStreaming在沒有額外代碼和配置的情況下可以恢複丢失的工作。
- 易整合到Spark體系,流式處理與批處理和互動式查詢相結合。
1.3 SparkStreaming與Storm的對比
主要的差別如下:
- Strom是一個純實時的流式處理架構,即來一條資料就處理一條資料,這樣勢必叢集内有頻繁的網絡通訊,吞吐量低。
- SparkStreaming是微批處理架構,吞吐量高。
- Strom的事務處理機制要比SparkStreaming的好,SparkStreaming中存在丢資料或者重複計算的問題,Storm中接受或拉取的每條資料可以準确的隻處理一次。
- Strom适合做簡單的彙總型計算,SparkStreaming可以做複雜的計算,因為SparkStreaming是基于Dstream來開發的,Dstream可以抽出RDD(即Spark的核心),支援更多的複雜計算。
- Strom支援動态資源的調整,而SparkStreaming是粗粒度的資源排程(新版本中即使有也是通過kill excutor的形式)。
2. Spark Streaming原理
Spark Streaming 是基于spark的流式批處理引擎,其基本原理是把輸入資料以某一時間間隔批量的處理,當批處理間隔縮短到秒級時,便可以用于處理實時資料流。
2.1 Spark Streaming計算流程
Spark Streaming是将流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark Core,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distributed Dataset),然後将Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,将RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行緩存或者存儲到外部裝置。下圖顯示了Spark Streaming的整個流程。
下面是SparkStreaming架構圖
2.2 Saprk Streaming的實時性
對于實時性的讨論,會牽涉到流式處理架構的應用場景。Spark Streaming将流式計算分解成多個Spark Job,對于每一段資料的處理都會經過Spark DAG圖分解以及Spark的任務集的排程過程。對于目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),是以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。
2.3 Spark Streaming的容錯性
對于流式計算來說,容錯性至關重要。首先我們要明确一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分布式可重算的資料集,其記錄着确定性的操作繼承關系(lineage),是以隻要輸入資料是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。
對于Spark Streaming來說,其RDD的傳承關系如下圖所示:
圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所産生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連接配接的,由于Spark Streaming輸入資料可以來自于磁盤,例如HDFS(多份拷貝)或是來自于網絡的資料流(Spark Streaming會将網絡輸入資料的每一個資料流拷貝兩份到其他的機器)都能保證容錯性,是以RDD中任意的Partition出錯,都可以并行地在其他機器上将缺失的Partition計算出來。這個容錯恢複方式比連續計算模型(如Storm)的效率更高。
3. DStream
3.1 什麼是DStream
Discretized Stream是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark算子操作後的結果資料流。在内部實作上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔内的資料,如下圖:
對資料的操作也是按照RDD為機關來進行的:
Spark Streaming使用資料源産生的資料流建立DStream,也可以在已有的DStream上使用一些操作來建立新的DStream。
它的工作流程像下面的圖所示一樣,接受到實時資料後,給資料分批次,然後傳給Spark Engine處理最後生成該批次的結果。
4. DStream的相關操作
DStream上的操作與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的操作,如:updateStateByKey()、transform()以及各種Window相關的操作。
4.1 Transformations on DStreams
Transformations | Meaning |
---|---|
map(func) | 對DStream中的各個元素進行func函數操作,然後傳回一個新的DStream |
flatMap(func) | 與map方法類似,隻不過各個輸入項可以被輸出為零個或多個輸出項 |
filter(func) | 過濾出所有函數func傳回值為true的DStream元素并傳回一個新的DStream |
repartition(numPartitions) | 增加或減少DStream中的分區數,進而改變DStream的并行度 |
union(otherStream) | 将源DStream和輸入參數為otherDStream的元素合并,并傳回一個新的DStream. |
count() | 通過對DStream中的各個RDD中的元素進行計數,然後傳回隻有一個元素的RDD構成的DStream |
reduce(func) | 對源DStream中的各個RDD中的元素利用func進行聚合操作,然後傳回隻有一個元素的RDD構成的新的DStream. |
countByValue() | 對于元素類型為K的DStream,傳回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數 |
reduceByKey(func, [numTasks]) | 利用func函數對源DStream中的key進行聚合操作,然後傳回新的(K,V)對構成的DStream |
join(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類型的DStream,傳回一個新的(K,(V,W))類型的DStream |
cogroup(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類型的DStream,傳回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream |
transform(func) | 通過RDD-to-RDD函數作用于DStream中的各個RDD,可以是任意的RDD操作,進而傳回一個新的RDD |
updateStateByKey(func) | 根據key的之前狀态值和key的新值,對key進行更新,傳回一個新狀态的DStream |
- 特殊的Transformations
(1)UpdateStateByKey Operation
UpdateStateByKey用于記錄曆史記錄,儲存上次的狀态
(2)Window Operations(開窗函數)
滑動視窗轉換操作:
滑動視窗轉換操作的計算過程如下圖所示,我們可以事先設定一個滑動視窗的長度(也就是視窗的持續時間),并且設定滑動視窗的時間間隔(每隔多長時間執行一次計算),然後,就可以讓視窗按照指定時間間隔在源DStream上滑動,每次視窗停放的位置上,都會有一部分DStream被框入視窗内,形成一個小段的DStream,這時,就可以啟動對這個小段DStream的計算。
(1)紅色的矩形就是一個視窗,視窗框住的是一段時間内的資料流。
(2)這裡面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個機關時間,視窗會slide一次。
是以基于視窗的操作,需要指定2個參數:
- window length - The duration of the window (3 in the figure)
-
slide interval - The interval at which the window-based operation is performed (2 in the figure).
a.視窗大小,一段時間内資料的容器。
b.滑動間隔,每隔多久計算一次。
4.2 Output Operations on DStreams
Output Operations可以将DStream的資料輸出到外部的資料庫或檔案系統,當某個Output Operations被調用時(與RDD的Action相同),spark streaming程式才會開始真正的計算過程。
Output Operations | Meaning |
---|---|
print() | 列印到控制台 |
saveAsTextFiles(prefix, [suffix]) | 儲存流的内容為文本檔案,檔案名為"prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | 儲存流的内容為SequenceFile,檔案名為"prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) | 儲存流的内容為hadoop檔案,檔案名為 “prefix-TIME_IN_MS[.suffix]”. |
foreachRDD(func) | 對Dstream裡面的每個RDD執行func |