天天看點

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析 原創文章,轉載請注明:轉載自 聽風居士部落格(http://blog.csdn.net/zhouzx2010)  

上篇部落格讨論了Spark Streaming 程式動态生成Job的過程,并留下一個疑問:  JobScheduler将動态生成的Job送出,然後調用了Job對象的run方法,最後run方法的調用是如何觸發RDD的Action操作,進而真正 觸發 Job的執行的呢?本文就具體講解這個問題。

一、DStream和RDD的關系     DSream 代表了一系列連續的RDD,DStream中每個RDD包含特定時間間隔的資料,如下圖所示:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

從上圖可以看出, 一個DStream 對應了時間次元上的多個RDD。

DStream 作為Spark Stream的一個基本抽象,提供了高層的API來進行Spark Streaming 程式開發,先看一個簡單的Spark Streaming的WordCount程式執行個體:

  1. object WordCount {
  2. def main(args:Array[String]): Unit ={
  3. val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount")
  4. val ssc = new StreamingContext(sparkConf,Seconds(1))
  5. val lines = ssc.socketTextStream("localhost",9999)
  6. val words = lines.flatMap(_.split(" "))
  7. val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
  8. wordCounts.print()
  9. ssc.start()
  10. ssc.awaitTermination()
  11. }
  12. }

我們會發現對DStream的操作和RDD的操作驚人的相似, 通過對DStream的不斷轉換,形成依賴關系。是以的DStream操作最終會轉換成底層的RDD的操作,上面的例子中 lines DStream轉換成wods DSteam。 lines DStream的 flatMap操作會作用于其中每一個RDD去生成words DStream 中的RDD, 過程如下圖所示:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

下面從源碼角度看一下 DStream和RDD的關系:     DStream 中 有一個HashMap[Time,RDD[T]]類型的對象 generatedRDDs,其中Key為作業開始時間,RDD為該DStream對應的RDD,源碼如下:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

二、Dstream 的分類     Dstream 主要分為三大類:

         1. Input DStream

         2.  Transformed DStream

         3. Output DStream

2.1 InputDStream 是DStream 最初誕生的地方,也是RDD最初誕生的地方,它是依據資料源建立的最初的DStream,如上面例子中的代碼:

val lines = ssc . socketTextStream ( "localhost" , 9999 )

基于Socket資料源建立了 SocketInputDStream對象lines,下面從源碼角度分析一下他是怎麼生成RDD的,  SocketInputDStream生成RDD的方法在 它的父類ReceiverInputDSteam中:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

ReceiverInputDSteam  的compute方法中調用了createBloackRDD方法基于Block資訊建立了RDD :

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

可以看到  ReceiverInputDSteam 的 createBloackRDD 方法new了BlockRDD對象,BlockRDD 是繼承自RDD。至此,最初的RDD建立完成。

2.2、  Transformed DStream 是由其他DStream 通過非Output算子裝換而來的DStream    例如例子中的lines通過flatMap算子轉換生成了FlatMappedDStream:

     val words =lines.flatMap(_.split(" "))    下面看一下flatMap的源碼:     

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

可以看到flatMap是DStream的方法,它建立了FlatMappeedDStream并傳回,上面例子中words 就是 FlatMappeedDStream 對象,建立 FlatMappeedDStream對象時傳入了 參數flatMapFunc,這裡的flatMapFunc就是使用者編寫的業務邏輯,我們再進入FlatMappedDStream,檢視其compute方法:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

可以驚喜的看到 FlatMappedDStream的compute方法調用了parent的getOrCompute方法擷取父DStream的RDD.通過對 父DStream的RDD的flatMap算子生成新的RDD,轉換的業務邏輯通過flatMapFunc參數傳遞給flatMap算子。這樣對DStream的操作都轉換成了對RDD的操作,同時DSream的依賴關系也與RDD之間依賴關系同時建立了起來。 說明:這些RDD的建立是在Job動态生成時候發生的,Job生成最終會調用ForeachDStream的generateJob方法,源碼如下

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

其中的parent.getOrCompute方法會依據DStream之間的依賴關系,導緻一系列的鍊式調用,進而建立所有的RDD,并形成RDD之間的依賴關系。

3.3  Output DStream 是有其他DStream通過Output算子生成,它隻存在于Output算子内部,并不會像Transformed Stream一樣由算子傳回, 他是觸發Job執行的關鍵。           那麼什麼是Output 算子呢? Output 算子是讓DStream中的資料被推送的外部系統,像資料庫,檔案系統(HDFS,GFS等)的算子。因為Output 算子是将轉換後的資料推送到外部系統被使用的操作,是以他觸發了前面轉換操作的真正執行(類似于RDD的action操作)。

          下面,我們看看有哪些Output算子:

Output Operation Meaning
print()

Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. 

Python API This is called pprint() in the Python API.

saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based onprefix and suffix: "prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as 

SequenceFiles

 of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". 

Python API This is not available in the Python API.

saveAsHadoopFiles(prefix, [suffix])

Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". 

Python API This is not available in the Python API.

foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

下面,回到我們開頭的例子:

wordCounts . print ()

其中pirnt算子就是Output算子,我們進入print的源碼:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

print()方法調用了print(10),其實是調用了另一個print方法:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

print 方法中首先定義了一個函數foreachFunc,foreachFunc從rdd中出去num個元素列印出來。接下來print函數調用了foreachRDD,并将foreachFunc的處理邏輯作為參數傳入。這裡的foreachRDD也是一個Output算子(上面已經有說明),接下來看看 foreachRDD的源碼。

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

可以看到foreachRDD中建立了一個ForeachDStream對象,這就是我們期待已久的Output DStream。這裡需要注意一個關鍵點: 建立完ForeachRDD對象後,調用了該對象的register方法。register方法将目前對象注冊給DStreamGraph。源碼如下:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

注冊的過程就是将目前對象加入graph的輸出流outputStream中:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

這個過程很重要,在Job觸發時候會用到outputStream。我們先在這裡記住這個過程,下面的分析會用到這個内容。

至此,DStream到RDD過程已經解析完畢。

三 、由Dstream觸發RDD的執行     Spark Stream的Job執行過程我在另一篇部落格有詳細介紹,具體細節請參考 http://www.cnblogs.com/zhouyf/p/5503682.html 在生成Job的過程中會調用DStreamGraph的generate方法:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

其中,就調用了outputStream的generateJob方法,這裡的outputStream就上面有output算子注冊給DStreamGraph的輸出流。就是我們執行個體中ForeachDStream 。

ForeachDStream 的generateJob方法源碼:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

可以看到它将我們的業務邏輯封裝成jobFunc傳遞給了最終生成的Job對象。

由上篇部落格《 Spark streaming技術内幕 : Job動态生成原理與源碼解析 》 我們知道在StreamContext啟動會動态建立job,并且最終調用Job的run方法

Job的run方法由JobScheduler的submitJobSet觸發 : 

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

其中jobExecutor對象是一個線程池,JobHandler實作了 Runnable接口,在JobHandler 的run方法中會調用傳入的job對象的run方法。在這裡Job的run方法開始線上程中執行,JobHandler的run方法源碼如下:

7.spark Streaming 技術内幕 : 從DSteam到RDD全過程解析

其中的job就是封裝了我們業務邏輯的Job對象,它的run方法會觸發我們在foreachRDD方法中對RDD的操作(一般是action操作),到這裡RDD的Action操作被觸發,spark作業開始執行。

總結:     1、在一個固定時間次元上,DStream和RDD是一一對應關系,可以将DStream看成是RDD在時間次元上封裝。     2、Dstream 主要分為三大類: Input DStream,Transformed DStream,Output DStream,其中Output Dstream 對開發者是透明的,存在于Output 算子内部。     3、Spark Streaming應用程式最終會轉化成對RDD操作的spark 程式,spark 程式由于執行了foreachRDD算子中的RDD操作被觸發。

原創文章,轉載請注明: 轉載自 聽風居士部落格( http://blog.csdn.net/zhouzx2010)   

繼續閱讀