天天看點

spark streaming源碼分析1 StreamingContext

部落格位址: http://blog.csdn.net/yueqian_zhu/

首先看一個最簡單的例子,了解大緻的樣子:

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           

本小節主要介紹StreamingContext的構造

class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  )
           

一、API:

1、cp_為null

def this(sparkContext: SparkContext, batchDuration: Duration)
           
2、方法内部也是通過conf自動建立一個sparkContext,cp_為null
def this(conf: SparkConf, batchDuration: Duration)
           
3、conf由預設的和參數部分組合而成,cp_為null
def this(
    master: String,
    appName: String,
    batchDuration: Duration,
    sparkHome: String = null,
    jars: Seq[String] = Nil,
    environment: Map[String, String] = Map())
           
4、從path目錄下讀取checkpoint的資訊來重建streamingContext,也就不需要sparkContext和Duration參數
def this(path: String, hadoopConf: Configuration)
           
def this(path: String)//hadoopConf使用預設的hadoop配置檔案自動構造
           
5、使用存在的sparkContext和checkpoint路徑來構造
def this(path: String, sparkContext: SparkContext)
           
6、需要注意的是,streamingContext對象内部有一個getOrCreate方法,指明如果在checkpointPath路徑下讀取不到,則調用creatingFunc建立新的streamingContext
def getOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = new Configuration(),
    createOnError: Boolean = false
  ): StreamingContext
           
二、StreamingContext主要的構造邏輯(checkpoint暫不讨論)
1、構造一個graph: DStreamGraph      
作用于DStream上的operation分成兩類 1. Transformation,2. Output 表示将輸出結果。DStreamGraph 有輸入就要有輸出,如果沒有輸出,則前面所做的所有動作全部沒有意義,那麼如何将這些輸入和輸出綁定起來呢?這個問題的解決就依賴于DStreamGraph,DStreamGraph記錄輸入的Stream和輸出的Stream。      
2、構造一個JobScheduler      
JobScheduler内部會構造一個jobGenerator,它用于按我們設定的批處理間隔産生job      
3、狀态設定為INITIALIZED      
下一節介紹上面例子中的operation部分

      

繼續閱讀