Flink运行架构
- 一、Flink运行组件
-
- 1、作业管理器(JobManager)
- 2、任务管理器(TaskManager)
- 3、资源管理器(ResourceManager)
- 4、分发器(Dispatcher)
- 二、任务执行流程
-
- 1.任务提交流程
- 2.任务提交流程(with Yarn)
- 3、任务调度原理
- 4、TaskManager和Taskslot关系
-
- 4.1、对slot(插槽),以及core,partition,Parallelism(并行度)理解。
- 三、程序与数据流
-
- 1、Dataflow
- 2、执行图(ExecutionGraph)
- 3、任务链(Operator Chains)
- 4、并行度(Parallelism)
一、Flink运行组件
- 作业管理器(JobManager)
- 任务管理器(TaskManager)
- 资源管理器(ResourceManager)
- 分发器(Dispacher)
同时运行在一个Master进程中,以上管理器都为线程。
1、作业管理器(JobManager)
控制一个应用程序执行的主进程,
- 每一个flink程序都会被一个不同的JobManager所控制执行。相当于一段flink的程序代码。
- 一个JobManager会接收一个需要执行的应用程序,这个应用程序中包含:作业图(JobGraph),逻辑数据流图(Local dataFlow graph),和打包的所有的类,库,以及其他所需要对应的jar包。
- JobManager会将获取的JobGraph转换为对应的物理层面的数据流图,也就是ExecutionGraph(执行图),执行图中包含了所有可以并行执行的任务。
- JobManager会向ResourceManager申请此次任务所需要的资源,该资源即为TaskManager上的插槽(slot),获取资源之后,会将得到的ExecutionGraph发送到对应的TaskManager上。
- JobManager在发送执行图之后,会负责对中央协调的操作。例如对于检查点checkpoints的协调。
2、任务管理器(TaskManager)
是Flink运行中,执行任务的工作进程。
- flink中会有多个TaskManager运行。每一个TaskManager上的插槽(slot)限制着每个TaskManager能够执行任务的数量。
- 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
- 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
3、资源管理器(ResourceManager)
主要负责管理任务管理器(TaskManager)的插槽(slot)
- TaskManager 插槽是Flink中定义的处理资源单元。
- Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及Standalone部署,一般为yarn。
- 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果Flink的资源管理器没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台(YARN的资源管理器)发起会话,以提供启动TaskManager进程的容器。
4、分发器(Dispatcher)
- 可以跨作业运行,它为应用提交提供了REST接口。
- 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
- 分发器也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
- 分发器在架构中可能并不是必需的,这取决于应用提交运行的方式。
二、任务执行流程
1.任务提交流程

个人理解:
- 首先App提交应用,两种方式。①通过flink run命令提交,该提交方式不经过Dispatcher,直接提交。②通过flink-web页面8081端口进行提交,会经过Dispatcher。
- 提交的应用会提交到JobManager上,JobManager收到应用后,向ResourceManager请求资源(slots),同时TaskManager启动,并向ResourceManager上请求注册slots。
- TaskManager向JobManager提供执行该应用所需要的资源。
- JobManager将该任务分配到对应的TaskManager的插槽中(slots),任务在TaskManagre的插槽中执行。
2.任务提交流程(with Yarn)
每次JobManager收到作业时,都会向ResourceManager请求资源,当ResourceManager资源不够时,会向yarn集群申请资源。
3、任务调度原理
代码(流程图) -> StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
- 我们写的flink代码会在打包编译的过程中,会存在优化,生成一个作业图。
- 然后通过分发器(Dispatcher)将作业图提交到JobManager。
- 通过调度器(scheduler)将任务调度到任务管理器(TaskManager)上执行。
- 每一个任务管理器都有对应的插槽(配置文件可以指定),每个任务管理器都是一个jvm进程,而每个任务插槽为一个线程。
- flink中通信模块用的是Actor System。
4、TaskManager和Taskslot关系
- Flink 中每一个 TaskManager 都是一个JVM进程,它中的一个或多个 subtask可能会在独立的线程上执行。
- 为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流 - 默认情况下,Flink 允许子任务共享 slot。 这样的结果是,一个 slot 可以保存作业的整个管道。
- Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力
4.1、对slot(插槽),以及core,partition,Parallelism(并行度)理解。
- 对全局设置并行度就是全局的并行度,对单个算子设置并行度,只作用于单个算子,一次程序申请的插槽的个数是max(全局的并行度,单个算子的并行度)。
三、程序与数据流
1、Dataflow
- flink所有的程序都是由三部分组成:Source,transform,Sink。
- Source:读取数据源。
- transform:通过对应的算子对数据进行转换和处理。
- Sink:对流的输出。
代码中还有,程序开始时的获取开发环境,程序完成后,需要对程序进行执行(对于代码而言)。
flink上运行的程序会被映射成一个逻辑数据流(dataflow),每一个dataflow包括三个部分source,transform,sink,
每一个dataflow以一个或多个source开始,以一个或多个sink结束。类似DAG(有向无环图)。
大部分情况是一对一。
2、执行图(ExecutionGraph)
flink执行图可以分为四层: StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
-
StreamGraph(流图):用户通过StreamAPI编写程序生成的最初的图,用来表示程序的拓扑结构。
真实存在的数据结构:全类名:org.apache.flink.streaming.api.graph.StreamGraph
//源码中的StreamGraph(流图)
! wordcount => env.execute();
@ 1681行 execute(DEFAULT_JOB_NAME);
# 1699行 return execute(getStreamGraph(jobName));
$ 1848行 return getStreamGraph(jobName, true);
^ 1863行 StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
-
JobGraph(工作图):在编译打包时生成。StreamGraph优化后生成JobGraph,提交到JobManager,主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点。也就是满足后面的任务链。
真实存在的数据结构:全类名:org.apache.flink.runtime.jobgraph.JobGraph
//源码中的JobGraph(工作图)
! wordcount => env.execute();
@ 1681行 execute(DEFAULT_JOB_NAME);
# 1699行 return execute(getStreamGraph(jobName));
$ 1713行 final JobClient jobClient = executeAsync(streamGraph);
^ 1812行 .execute(streamGraph, configuration); //将StreamGraph作为参数传入
* 15行 CompletableFuture<JobClient> execute(Pipeline var1, Configuration var2) throws Exception;的实现LocalExecutor.class中
《 51行 JobGraph jobGraph = this.getJobGraph(pipeline, effectiveConfig);//将StreamGraph转换为JobGraph
; 63行 return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
[ 27行 JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
] 18行 return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);//将StreamGraph转换为JobGraph(pipeline就是相当于StreamGraph,底层会强转至StreamGraph)
+ 进入该方法的实现类:StreamGraphTranslator
- 25行 StreamGraph streamGraph = (StreamGraph)pipeline;//pipeline强制转换为StreamGraph
= 26行 return streamGraph.getJobGraph((JobID)null);
/ 850行 return StreamingJobGraphGenerator.createJobGraph(this, jobID);
》 109行 return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); // 中进入createJobGraph
< 169行 setChaining(hashes, legacyHashes);//核心
个人理解是将满足需求的算子的id赋予相同的hash-id值。
-
ExecutionGraph(执行图):JobGraph通过JobManager生成ExecutionGraph。
真实存在的数据结构:全类名:org.apache.flink.runtime.executiongraph.ExecutionGraph
- 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
3、任务链(Operator Chains)
- Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
- 相同并行度的 one-to-one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask
- 并行度相同、并且是 one-to-one 操作,两个条件缺一不可(例如filter算子和map算子,并行度相同,没有shuffle操作,类似spark中的窄依赖,合成一起放到同一个taskslot中执行)
二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
4、并行度(Parallelism)
- 一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。
- 一个TaskManager上可以有多个Taskslot
- 一个Taskslot上只能运行一个并行度的子任务
- 多个子任务可以运行在多个TaskManager上。
- 一个程序中,不同的算子可能具有不同的并行度
- 算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具体是哪一种形式,取决于算子的种类,类似spark中的宽依赖和窄依赖。
- One-to-one:stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
- Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。