以下源碼基于Flink 1.14版本
用戶端送出任務
CliFrontend.main() ->
CliFrontend.parseAndRun() ->
CliFrontend.run() ->
CliFrontend.executeProgram() ->
ClientUtils.executeProgram() ->
PackagedProgram.invokeInteractiveModeForExecution() ->
PackagedProgram.callMainMethod() ->
mainMethod.invoke() ->
StreamExecutionEnvironment.execute(String jobName)
在用戶端生成StreamGraph
StreamExecutionEnvironment.execute(String jobName) ->
StreamExecutionEnvironment.getStreamGraph() ->
StreamExecutionEnvironment.getStreamGraph(boolean) ->
StreamGraphGenerator.generate() ->
StreamGraphGenerator.transform() ->
StreamGraphGenerator.translate() ->
SimpleTransformationTranslator.translateForStreaming() ->
SimpleTransformationTranslator.translateForStreamingInternal()
說明:
- 實體轉換會添加StreamNode然後添加StreamEdge與上遊StreamNode連接配接
- 對partition轉換沒有生成具體的StreamNode和StreamEdge,而是添加一個虛拟節點,當partition的下遊transform(如map)添加edge時(調用StreamGraph.addEdge),會把partition資訊寫入到edge中
在用戶端生成JobGraph
StreamGraph 轉變成 JobGraph 也是在用戶端完成,主要做了三件事:
- StreamNode 轉成 JobVertex
- StreamEdge 轉成 JobEdge
- JobEdge 和 JobVertex 之間建立 IntermediateDataSet 來連接配接
源碼追蹤:
StreamExecutionEnvironment.execute(String jobName) ->
StreamExecutionEnvironment.execute(StreamGraph) ->
StreamExecutionEnvironment.executeAsync(StreamGraph) ->
PipelineExecutor.execute() ->
AbstractJobClusterExecutor.execute() ->
PipelineExecutorUtils.getJobGraph() ->
FlinkPipelineTranslationUtil.getJobGraph() ->
StreamGraphTranslator.translateToJobGraph() ->
StreamGraph.getJobGraph(JobID) ->
StreamingJobGraphGenerator.createJobGraph(StreamGraph, JobID) ->
StreamingJobGraphGenerator.createJobGraph()
送出任務後生成ExecutionGraph
client生成JobGraph之後,就通過submitJob送出給JobManager,JobManager會根據JobGraph生成對應的ExecutionGraph。
ExecutionGraph是Flink作業排程時使用到的核⼼資料結構,它包含每一個并⾏的 task、每⼀個 intermediate stream 以及它們之間的關系。
源碼追蹤:
Dispatcher.runJob() ->
Dispatcher.createJobManagerRunner() ->
JobMasterServiceLeadershipRunner.start() ->
StandaloneLeaderElectionService.start() ->
JobMasterServiceLeadershipRunner.grantLeadership() ->
JobMasterServiceLeadershipRunner.startJobMasterServiceProcessAsync() ->
JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess() ->
JobMasterServiceLeadershipRunner.createNewJobMasterServiceProcess() ->
JobMasterServiceProcessFactory.create() ->
DefaultJobMasterServiceProcessFactory.create() ->
new DefaultJobMasterServiceProcess() ->
DefaultJobMasterServiceFactory.createJobMasterService() ->
DefaultJobMasterServiceFactory.internalCreateJobMasterService() ->
new JobMaster() ->
JobMaster.createScheduler() ->
DefaultSlotPoolServiceSchedulerFactory.createScheduler() ->
DefaultSchedulerFactory.createInstance() ->
new DefaultScheduler() ->
new SchedulerBase() ->
SchedulerBase.createAndRestoreExecutionGraph() ->
DefaultExecutionGraphFactory.createAndRestoreExecutionGraph() ->
DefaultExecutionGraphBuilder.buildGraph() ->
DefaultExecutionGraph.attachJobGraph()
JobGraph 到 ExexcutionGraph 以及實體執行計劃的流程:
1、将 JobGraph 裡面的 jobVertex 從 Source 節點開始排序。
2、在 executionGraph.attachJobGraph(sortedTopology) 方法裡面,根據 JobVertex 生成 ExecutionJobVertex
- 在 ExecutionJobVertex 構造方法裡面,根據 jobVertex 的 IntermediateDataSet 建構 IntermediateResult,根據 jobVertex 并發建構 ExecutionVertex
- ExecutionVertex 建構的時候,建構 IntermediateResultPartition,每一個 Execution 建構數個 IntermediateResultPartition
- 将建立的 ExecutionJobVertex 與前置的 IntermediateResult 連接配接起來
3、建構 ExecutionEdge,連接配接到前面的 IntermediateResultPartition,最終從 ExecutionGraph 到實體執行計劃
Task的排程和執行
從JobMaster啟動的地方開始看源碼:
DefaultJobMasterServiceFactory.internalCreateJobMasterService() ->
jobMaster.start() ->
JobMaster.onStart() ->
JobMaster.startJobExecution() ->
JobMaster.startScheduling() ->
SchedulerBase.startScheduling() ->
DefaultScheduler.startSchedulingInternal() ->
PipelinedRegionSchedulingStrategy.startScheduling() ->
PipelinedRegionSchedulingStrategy.maybeScheduleRegions() ->
PipelinedRegionSchedulingStrategy.maybeScheduleRegion() ->
DefaultScheduler.allocateSlotsAndDeploy() ->
DefaultScheduler.waitForAllSlotsAndDeploy() ->
DefaultScheduler.deployAll() ->
DefaultScheduler.deployOrHandleError() ->
DefaultScheduler.deployTaskSafe() ->
DefaultExecutionVertexOperations.deploy() ->
ExecutionVertex.deploy() ->
Execution.deploy() ->
TaskManagerGateway.submitTask() ->
RpcTaskManagerGateway.submitTask() ->
TaskExecutor.submitTask() ->
Task.startTaskThread() ->
executingThread.start() ->
Task.run() ->
Task.doRun() ->
Task.restoreAndInvoke() ->
StreamTask.invoke() ->
StreamTask.runMailboxLoop() ->
MailboxProcessor.runMailboxLoop() ->
MailboxDefaultAction.runDefaultAction() ->
StreamTask.processInput() ->
StreamInputProcessor.processInput() ->
StreamOneInputProcessor.processInput() ->
AbstractStreamTaskNetworkInput.emitNext() ->
AbstractStreamTaskNetworkInput.processElement() ->
OneInputStreamTask.StreamTaskNetworkOutput.emitRecord() ->
如果是map算子 StreamMap.processElement()