天天看點

6.Spark streaming技術内幕 : Job動态生成原理與源碼解析

原創文章,轉載請注明:轉載自 周嶽飛部落格(http://zhou-yuefei.iteye.com/) 

  Spark streaming 程式的運作過程是将DStream的操作轉化成RDD的操作,Spark Streaming 和 Spark Core 的關系如下圖(圖檔來自spark官網)

6.Spark streaming技術内幕 : Job動态生成原理與源碼解析

  Spark Streaming 會按照程式設定的時間間隔不斷動态生成Job來處理輸入資料,這裡的Job生成是指将Spark Streaming 的程式翻譯成Spark核心的RDD操作,翻譯的過程并不會觸發Job的運作,Spark Streaming 會将翻譯的處理邏輯封裝在Job對象中,最後會将Job送出到叢集上運作。這就是Spark Streaming 運作的基本過程。下面詳細介紹Job動态生成和送出過程。

  首先,當SparkStreaming的start方法調用後,整個Spark Streaming 程式開始運作,按照指定的時間間隔生成Job并送出給叢集運作,在生成Job的工程中主要核心對象有     1.JobScheduler   

    2.JobGenerator

    3.DStreamGraph

    4.DStream

其中,  JobScheduler  負責啟動 JobGenerator生成Job,并送出生成的Job到叢集運作,這裡的Job不是在spark core 中提到的job,它隻是作業運作的代碼模闆,是邏輯級别的,可以類比java線程中的Runnable接口實作,不是真正運作的作業,  它封裝了由DStream轉化而來的RDD操作.JobGenerator負責定時調用 DStreamingGraph的generateJob方法生成Job和清理Dstream的中繼資料,  DStreamGraph持有構成DStream圖的所有DStream對象,并調用 DStream的generateJob方法生成具體Job對象.DStream生成最終的Job交給 JobScheduler  排程執行。整體過程如下圖所示:

6.Spark streaming技術内幕 : Job動态生成原理與源碼解析

  原創文章,轉載請注明: 轉載自 周嶽飛部落格( http://zhou-yuefei.iteye.com/ )   下面結合源碼分析每一步過程 (源碼中黃色背景部分為核心邏輯代碼,例如 :  scheduler .start() ) : 首先,StreamingContext起動時調用start方法

try{

validate()

// Start the streaming scheduler in a new thread, so that thread local properties

// like call sites and job groups can be reset without affecting those of the

// current thread.

ThreadUtils.runInNewThread("streaming-start"){

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,"false")

savedProperties.set(SerializationUtils.clone(

sparkContext.localProperties.get()).asInstanceOf[Properties])

scheduler.start()

}

state =StreamingContextState.ACTIVE

}catch{

caseNonFatal(e)=>

logError("Error starting the context, marking it as stopped", e)

scheduler.stop(false)

state =StreamingContextState.STOPPED

throw e

}

  其中調用了scheduler的start方法,此處的scheduler 就是  org.apache.spark.streaming.scheduler.JobScheduler   對象, StreamingContext持有 org.apache.spark.streaming.scheduler.JobScheduler 對象的引用。 下面看一下 JobScheduler的start方法:  

eventLoop =newEventLoop[JobSchedulerEvent]("JobScheduler"){

override protected def onReceive(event:JobSchedulerEvent):Unit= processEvent(event)

override protected def onError(e:Throwable):Unit= reportError("Error in job scheduler", e)

}

eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates

for{

inputDStream <- ssc.graph.getInputStreams

rateController <- inputDStream.rateController

} ssc.addStreamingListener(rateController)

listenerBus.start()

receiverTracker =newReceiverTracker(ssc)

inputInfoTracker =newInputInfoTracker(ssc)

executorAllocationManager =ExecutorAllocationManager.createIfEnabled(

ssc.sparkContext,

receiverTracker,

ssc.conf,

ssc.graph.batchDuration.milliseconds,

clock)

executorAllocationManager.foreach(ssc.addStreamingListener)

receiverTracker.start()

jobGenerator.start()

executorAllocationManager.foreach(_.start())

logInfo("Started JobScheduler")

  可以看到 JobScheduler調用了 jobGenerator的start方法和 eventLoop的start方法,eventLoop用來接收 JobSchedulerEvent消息,并交給 processEvent函數進行處理 代碼如下:

private def processEvent(event:JobSchedulerEvent){

try{

event match {

caseJobStarted(job, startTime)=> handleJobStart(job, startTime)

caseJobCompleted(job, completedTime)=> handleJobCompletion(job, completedTime)

caseErrorReported(m, e)=> handleError(m, e)

}

}catch{

case e:Throwable=>

reportError("Error in job scheduler", e)

}

}

   可以看到 JobScheduler中的eventLoop隻處理JobStarted,JobCompleted和ErrorReported 三類消息,這三類消息的處理不是Job動态生成的核心邏輯代碼先略過,(注意:後面 JobGenerator中也有個eventLoop不要和這裡的eventLoop混淆。) JobGenerator的start方法首先new了一個EventLoop對象eventLoop,并複 寫onReceive(),将收到的JobGeneratorEvent 消息交給 processEvent 方法處理.源碼如下:  

def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }

JobGenerator建立了eventLoop對象之後調用該對象的start方法,啟動監聽程序,準備接收JobGeneratorEvent類型消息交給processEvent函數處理,然後調用了startFirstTime方法,該方法啟動DStreamGraph和定時器,定時器啟動後根據程式設定的時間間隔給eventLoop對象發送GenerateJobs消息,如下圖:  

6.Spark streaming技術内幕 : Job動态生成原理與源碼解析

  原創文章,轉載請注明: 轉載自 周嶽飛部落格( http://zhou-yuefei.iteye.com/ )     eventLoop對象收到  GenerateJobs 消息交個processEvent方法處理,processEvent收到該消息,調用generateJobs方法處理,源碼如下:  

private def generateJobs(time:Time){

// Checkpoint all RDDs marked for checkpointing to ensure their lineages are

// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).

ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS,"true")

Try{

jobScheduler.receiverTracker.allocateBlocksToBatch(time)// allocate received blocks to batch

graph.generateJobs(time)// generate jobs using allocated block

} match {

caseSuccess(jobs)=>

val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

caseFailure(e)=>

jobScheduler.reportError("Error generating jobs for time "+ time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =false))

}

  JobGenerator中的generateJobs方法主要關注兩行代碼,首先調用graph的generateJobs方法,給方法傳回Success(jobs) 或者 Failure(e),其中的jobs就是該方法傳回的Job對象集合,如果Job建立成功,再調用JobScheduler的submitJobSet方法将job送出給叢集執行。 首先分析Job對象的産生,DStreamGraph 的start方法源碼:

def generateJobs(time:Time):Seq[Job]={

logDebug("Generating jobs for time "+ time)

val jobs =this.synchronized{

outputStreams.flatMap { outputStream =>

val jobOption = outputStream.generateJob(time)

jobOption.foreach(_.setCallSite(outputStream.creationSite))

jobOption

}

}

logDebug("Generated "+ jobs.length +" jobs for time "+ time)

jobs

}

  DStreamGraph 的start方法源碼調用了outputStream對象的generateJob方法,ForeachDStream重寫了該方法:

override def generateJob(time:Time):Option[Job]={

parent.getOrCompute(time) match {

caseSome(rdd)=>

val jobFunc =()=> createRDDWithLocalProperties(time, displayInnerRDDOps){

foreachFunc(rdd, time)

}

Some(newJob(time, jobFunc))

caseNone=>None

}

}

  ForeachDStream的 generateJob 将使用者編寫的DStream處理函數封裝在jobFunc中,并将其傳入Job對象,至此Job的生成。 接下來分析Job送出過程,JobScheduler負責Job的送出,核心代碼在submitJobSet方法中:  

def submitJobSet(jobSet:JobSet){

if(jobSet.jobs.isEmpty){

logInfo("No jobs added for time "+ jobSet.time)

}else{

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job => jobExecutor.execute(newJobHandler(job)))

logInfo("Added jobs for time "+ jobSet.time)

}

}

  其中jobExecutor對象是一個線程池,JobHandler實作了 Runnable接口,在JobHandler 的run方法中會調用傳入的job對象的run方法。   疑問:Job的run方法執行是如何觸發RDD的Action操作進而出發job的真正運作的呢?我們下次再具體分析,請随時關注部落格更新!   原創文章,轉載請注明: 轉載自 周嶽飛部落格( http://zhou-yuefei.iteye.com/ )