天天看點

Flink任務排程源碼梳理

Flink任務排程源碼梳理
Flink任務排程源碼梳理
以下源碼基于Flink 1.14版本

用戶端送出任務

CliFrontend.main() ->

CliFrontend.parseAndRun() ->

CliFrontend.run() ->

CliFrontend.executeProgram() ->

ClientUtils.executeProgram() ->

PackagedProgram.invokeInteractiveModeForExecution() ->

PackagedProgram.callMainMethod() ->

mainMethod.invoke() ->

StreamExecutionEnvironment.execute(String jobName)

在用戶端生成StreamGraph

StreamExecutionEnvironment.execute(String jobName) ->

StreamExecutionEnvironment.getStreamGraph() ->

StreamExecutionEnvironment.getStreamGraph(boolean) ->

StreamGraphGenerator.generate() ->

StreamGraphGenerator.transform() ->

StreamGraphGenerator.translate() ->

SimpleTransformationTranslator.translateForStreaming() ->

SimpleTransformationTranslator.translateForStreamingInternal()

Flink任務排程源碼梳理

說明:

  • 實體轉換會添加StreamNode然後添加StreamEdge與上遊StreamNode連接配接
  • 對partition轉換沒有生成具體的StreamNode和StreamEdge,而是添加一個虛拟節點,當partition的下遊transform(如map)添加edge時(調用StreamGraph.addEdge),會把partition資訊寫入到edge中

在用戶端生成JobGraph

StreamGraph 轉變成 JobGraph 也是在用戶端完成,主要做了三件事:

  • StreamNode 轉成 JobVertex
  • StreamEdge 轉成 JobEdge
  • JobEdge 和 JobVertex 之間建立 IntermediateDataSet 來連接配接

源碼追蹤:

StreamExecutionEnvironment.execute(String jobName) ->

StreamExecutionEnvironment.execute(StreamGraph) ->

StreamExecutionEnvironment.executeAsync(StreamGraph) ->

PipelineExecutor.execute() ->

AbstractJobClusterExecutor.execute() ->

PipelineExecutorUtils.getJobGraph() ->

FlinkPipelineTranslationUtil.getJobGraph() ->

StreamGraphTranslator.translateToJobGraph() ->

StreamGraph.getJobGraph(JobID) ->

StreamingJobGraphGenerator.createJobGraph(StreamGraph, JobID) ->

StreamingJobGraphGenerator.createJobGraph()

Flink任務排程源碼梳理

送出任務後生成ExecutionGraph

client生成JobGraph之後,就通過submitJob送出給JobManager,JobManager會根據JobGraph生成對應的ExecutionGraph。

ExecutionGraph是Flink作業排程時使用到的核⼼資料結構,它包含每一個并⾏的 task、每⼀個 intermediate stream 以及它們之間的關系。

源碼追蹤:

Dispatcher.runJob() ->

Dispatcher.createJobManagerRunner() ->

JobMasterServiceLeadershipRunner.start() ->

StandaloneLeaderElectionService.start() ->

JobMasterServiceLeadershipRunner.grantLeadership() ->

JobMasterServiceLeadershipRunner.startJobMasterServiceProcessAsync() ->

JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess() ->

JobMasterServiceLeadershipRunner.createNewJobMasterServiceProcess() ->

JobMasterServiceProcessFactory.create() ->

DefaultJobMasterServiceProcessFactory.create() ->

new DefaultJobMasterServiceProcess() ->

DefaultJobMasterServiceFactory.createJobMasterService() ->

DefaultJobMasterServiceFactory.internalCreateJobMasterService() ->

new JobMaster() ->

JobMaster.createScheduler() ->

DefaultSlotPoolServiceSchedulerFactory.createScheduler() ->

DefaultSchedulerFactory.createInstance() ->

new DefaultScheduler() ->

new SchedulerBase() ->

SchedulerBase.createAndRestoreExecutionGraph() ->

DefaultExecutionGraphFactory.createAndRestoreExecutionGraph() ->

DefaultExecutionGraphBuilder.buildGraph() ->

DefaultExecutionGraph.attachJobGraph()

JobGraph 到 ExexcutionGraph 以及實體執行計劃的流程:

1、将 JobGraph 裡面的 jobVertex 從 Source 節點開始排序。

2、在 executionGraph.attachJobGraph(sortedTopology) 方法裡面,根據 JobVertex 生成 ExecutionJobVertex

  • 在 ExecutionJobVertex 構造方法裡面,根據 jobVertex 的 IntermediateDataSet 建構 IntermediateResult,根據 jobVertex 并發建構 ExecutionVertex
  • ExecutionVertex 建構的時候,建構 IntermediateResultPartition,每一個 Execution 建構數個 IntermediateResultPartition
  • 将建立的 ExecutionJobVertex 與前置的 IntermediateResult 連接配接起來

3、建構 ExecutionEdge,連接配接到前面的 IntermediateResultPartition,最終從 ExecutionGraph 到實體執行計劃

Task的排程和執行

從JobMaster啟動的地方開始看源碼:

DefaultJobMasterServiceFactory.internalCreateJobMasterService() ->

jobMaster.start() ->

JobMaster.onStart() ->

JobMaster.startJobExecution() ->

JobMaster.startScheduling() ->

SchedulerBase.startScheduling() ->

DefaultScheduler.startSchedulingInternal() ->

PipelinedRegionSchedulingStrategy.startScheduling() ->

PipelinedRegionSchedulingStrategy.maybeScheduleRegions() ->

PipelinedRegionSchedulingStrategy.maybeScheduleRegion() ->

DefaultScheduler.allocateSlotsAndDeploy() ->

DefaultScheduler.waitForAllSlotsAndDeploy() ->

DefaultScheduler.deployAll() ->

DefaultScheduler.deployOrHandleError() ->

DefaultScheduler.deployTaskSafe() ->

DefaultExecutionVertexOperations.deploy() ->

ExecutionVertex.deploy() ->

Execution.deploy() ->

TaskManagerGateway.submitTask() ->

RpcTaskManagerGateway.submitTask() ->

TaskExecutor.submitTask() ->

Task.startTaskThread() ->

executingThread.start() ->

Task.run() ->

Task.doRun() ->

Task.restoreAndInvoke() ->

StreamTask.invoke() ->

StreamTask.runMailboxLoop() ->

MailboxProcessor.runMailboxLoop() ->

MailboxDefaultAction.runDefaultAction() ->

StreamTask.processInput() ->

StreamInputProcessor.processInput() ->

StreamOneInputProcessor.processInput() ->

AbstractStreamTaskNetworkInput.emitNext() ->

AbstractStreamTaskNetworkInput.processElement() ->

OneInputStreamTask.StreamTaskNetworkOutput.emitRecord() ->

如果是map算子 StreamMap.processElement()

繼續閱讀