部落格位址: http://blog.csdn.net/yueqian_zhu/
首先看一個最簡單的例子,了解大緻的樣子:
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
本小節主要介紹StreamingContext的構造
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
)
一、API:
1、cp_為null
def this(sparkContext: SparkContext, batchDuration: Duration)
2、方法内部也是通過conf自動建立一個sparkContext,cp_為null
def this(conf: SparkConf, batchDuration: Duration)
3、conf由預設的和參數部分組合而成,cp_為null
def this(
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map())
4、從path目錄下讀取checkpoint的資訊來重建streamingContext,也就不需要sparkContext和Duration參數
def this(path: String, hadoopConf: Configuration)
def this(path: String)//hadoopConf使用預設的hadoop配置檔案自動構造
5、使用存在的sparkContext和checkpoint路徑來構造
def this(path: String, sparkContext: SparkContext)
6、需要注意的是,streamingContext對象内部有一個getOrCreate方法,指明如果在checkpointPath路徑下讀取不到,則調用creatingFunc建立新的streamingContext
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext
二、StreamingContext主要的構造邏輯(checkpoint暫不讨論)
1、構造一個graph: DStreamGraph
作用于DStream上的operation分成兩類 1. Transformation,2. Output 表示将輸出結果。DStreamGraph 有輸入就要有輸出,如果沒有輸出,則前面所做的所有動作全部沒有意義,那麼如何将這些輸入和輸出綁定起來呢?這個問題的解決就依賴于DStreamGraph,DStreamGraph記錄輸入的Stream和輸出的Stream。
2、構造一個JobScheduler
JobScheduler内部會構造一個jobGenerator,它用于按我們設定的批處理間隔産生job
3、狀态設定為INITIALIZED
下一節介紹上面例子中的operation部分