原創文章,轉載請注明:轉載自 周嶽飛部落格(http://zhou-yuefei.iteye.com/)
Spark streaming 程式的運作過程是将DStream的操作轉化成RDD的操作,Spark Streaming 和 Spark Core 的關系如下圖(圖檔來自spark官網)
Spark Streaming 會按照程式設定的時間間隔不斷動态生成Job來處理輸入資料,這裡的Job生成是指将Spark Streaming 的程式翻譯成Spark核心的RDD操作,翻譯的過程并不會觸發Job的運作,Spark Streaming 會将翻譯的處理邏輯封裝在Job對象中,最後會将Job送出到叢集上運作。這就是Spark Streaming 運作的基本過程。下面詳細介紹Job動态生成和送出過程。
首先,當SparkStreaming的start方法調用後,整個Spark Streaming 程式開始運作,按照指定的時間間隔生成Job并送出給叢集運作,在生成Job的工程中主要核心對象有 1.JobScheduler
2.JobGenerator
3.DStreamGraph
4.DStream
其中, JobScheduler 負責啟動 JobGenerator生成Job,并送出生成的Job到叢集運作,這裡的Job不是在spark core 中提到的job,它隻是作業運作的代碼模闆,是邏輯級别的,可以類比java線程中的Runnable接口實作,不是真正運作的作業, 它封裝了由DStream轉化而來的RDD操作.JobGenerator負責定時調用 DStreamingGraph的generateJob方法生成Job和清理Dstream的中繼資料, DStreamGraph持有構成DStream圖的所有DStream對象,并調用 DStream的generateJob方法生成具體Job對象.DStream生成最終的Job交給 JobScheduler 排程執行。整體過程如下圖所示:
原創文章,轉載請注明: 轉載自 周嶽飛部落格( http://zhou-yuefei.iteye.com/ ) 下面結合源碼分析每一步過程 (源碼中黃色背景部分為核心邏輯代碼,例如 : scheduler .start() ) : 首先,StreamingContext起動時調用start方法
try{
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start"){
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,"false")
savedProperties.set(SerializationUtils.clone(
sparkContext.localProperties.get()).asInstanceOf[Properties])
scheduler.start()
}
state =StreamingContextState.ACTIVE
}catch{
caseNonFatal(e)=>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state =StreamingContextState.STOPPED
throw e
}
其中調用了scheduler的start方法,此處的scheduler 就是 org.apache.spark.streaming.scheduler.JobScheduler 對象, StreamingContext持有 org.apache.spark.streaming.scheduler.JobScheduler 對象的引用。 下面看一下 JobScheduler的start方法:
eventLoop =newEventLoop[JobSchedulerEvent]("JobScheduler"){
override protected def onReceive(event:JobSchedulerEvent):Unit= processEvent(event)
override protected def onError(e:Throwable):Unit= reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for{
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
receiverTracker =newReceiverTracker(ssc)
inputInfoTracker =newInputInfoTracker(ssc)
executorAllocationManager =ExecutorAllocationManager.createIfEnabled(
ssc.sparkContext,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
可以看到 JobScheduler調用了 jobGenerator的start方法和 eventLoop的start方法,eventLoop用來接收 JobSchedulerEvent消息,并交給 processEvent函數進行處理 代碼如下:
private def processEvent(event:JobSchedulerEvent){
try{
event match {
caseJobStarted(job, startTime)=> handleJobStart(job, startTime)
caseJobCompleted(job, completedTime)=> handleJobCompletion(job, completedTime)
caseErrorReported(m, e)=> handleError(m, e)
}
}catch{
case e:Throwable=>
reportError("Error in job scheduler", e)
}
}
可以看到 JobScheduler中的eventLoop隻處理JobStarted,JobCompleted和ErrorReported 三類消息,這三類消息的處理不是Job動态生成的核心邏輯代碼先略過,(注意:後面 JobGenerator中也有個eventLoop不要和這裡的eventLoop混淆。) JobGenerator的start方法首先new了一個EventLoop對象eventLoop,并複 寫onReceive(),将收到的JobGeneratorEvent 消息交給 processEvent 方法處理.源碼如下:
def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
JobGenerator建立了eventLoop對象之後調用該對象的start方法,啟動監聽程序,準備接收JobGeneratorEvent類型消息交給processEvent函數處理,然後調用了startFirstTime方法,該方法啟動DStreamGraph和定時器,定時器啟動後根據程式設定的時間間隔給eventLoop對象發送GenerateJobs消息,如下圖:
原創文章,轉載請注明: 轉載自 周嶽飛部落格( http://zhou-yuefei.iteye.com/ ) eventLoop對象收到 GenerateJobs 消息交個processEvent方法處理,processEvent收到該消息,調用generateJobs方法處理,源碼如下:
private def generateJobs(time:Time){
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS,"true")
Try{
jobScheduler.receiverTracker.allocateBlocksToBatch(time)// allocate received blocks to batch
graph.generateJobs(time)// generate jobs using allocated block
} match {
caseSuccess(jobs)=>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
caseFailure(e)=>
jobScheduler.reportError("Error generating jobs for time "+ time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =false))
}
JobGenerator中的generateJobs方法主要關注兩行代碼,首先調用graph的generateJobs方法,給方法傳回Success(jobs) 或者 Failure(e),其中的jobs就是該方法傳回的Job對象集合,如果Job建立成功,再調用JobScheduler的submitJobSet方法将job送出給叢集執行。 首先分析Job對象的産生,DStreamGraph 的start方法源碼:
def generateJobs(time:Time):Seq[Job]={
logDebug("Generating jobs for time "+ time)
val jobs =this.synchronized{
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated "+ jobs.length +" jobs for time "+ time)
jobs
}
DStreamGraph 的start方法源碼調用了outputStream對象的generateJob方法,ForeachDStream重寫了該方法:
override def generateJob(time:Time):Option[Job]={
parent.getOrCompute(time) match {
caseSome(rdd)=>
val jobFunc =()=> createRDDWithLocalProperties(time, displayInnerRDDOps){
foreachFunc(rdd, time)
}
Some(newJob(time, jobFunc))
caseNone=>None
}
}
ForeachDStream的 generateJob 将使用者編寫的DStream處理函數封裝在jobFunc中,并将其傳入Job對象,至此Job的生成。 接下來分析Job送出過程,JobScheduler負責Job的送出,核心代碼在submitJobSet方法中:
def submitJobSet(jobSet:JobSet){
if(jobSet.jobs.isEmpty){
logInfo("No jobs added for time "+ jobSet.time)
}else{
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(newJobHandler(job)))
logInfo("Added jobs for time "+ jobSet.time)
}
}
其中jobExecutor對象是一個線程池,JobHandler實作了 Runnable接口,在JobHandler 的run方法中會調用傳入的job對象的run方法。 疑問:Job的run方法執行是如何觸發RDD的Action操作進而出發job的真正運作的呢?我們下次再具體分析,請随時關注部落格更新! 原創文章,轉載請注明: 轉載自 周嶽飛部落格( http://zhou-yuefei.iteye.com/ )