天天看点

一文详解Flink on Yarn的三种部署方式及使用说明

作者:散文随风想
  • Local模式:通过一个JVM进程中,通过线程模拟出各个Flink角色来得到Flink环境
  • Standalone模式:各个角色是独立的进程存在
  • YARN模式:Flink的各个角色,均运行在多个YARN的容器内,其整体上是一个YARN的任务

flink on yarn的前提是:hdfs、yarn均启动

在企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:

  • Yarn的资源可以按需使用,提高集群的资源利用率
  • Yarn的任务有优先级,根据优先级运行作业
  • 基于Yarn调度系统,能够自动化地处理各个角色的 Failover( 容错 )
  • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
  • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
  • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

准备工作

  • jdk1.8及以上【配置JAVA_HOME环境变量】
  • ssh免密码登录【集群内节点之间免密登录】
  • 至少hadoop2.2
  • hdfs & yarn均启动

集群规划

  • 服务器: node1(ResourceManager+ NodeManager)
  • 服务器: node2(NodeManager)
  • 服务器: node3(NodeManager)

修改hadoop的配置参数

打开yarn配置页面

(每台hadoop节点都需要修改)

vim etc/hadoop/yarn-site.xml           

添加

<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>           

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job

分发yarn-site.xml到其它服务器节点

scp yarn-site.xml node2:$PWD
scp yarn-site.xml node3:$PWD           

启动HDFS、YARN集群

start-all.sh           

Flink on Yarn的运行机制

一文详解Flink on Yarn的三种部署方式及使用说明

从图中可以看出,Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,都区成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。

当启动一个Flink Yarn会话时,客户端首先会检查本次请求的资源是否足够。资源足够将会上传包含HDFS配置信息和Flink的jar包到HDFS。

随后客户端会向Yarn发起请求,启动applicationMaster,随后NodeManager将会加载有配置信息和jar包,一旦完成,ApplicationMaster(AM)便启动。

当JobManager and AM 成功启动时,他们都属于同一个container,从而AM就能检索到JobManager的地址。此时会生成新的Flink配置信息以便TaskManagers能够连接到JobManager。同时,AM也提供Flink的WEB接口。用户可并行执行多个Flink会话。

随后,AM将会开始为分发从HDFS中下载的jar以及配置文件的container给TaskMangers.完成后Fink就完全启动并等待接收提交的job。

Flink on Yarn的三种部署方式介绍

Session模式

这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

一文详解Flink on Yarn的三种部署方式及使用说明
一文详解Flink on Yarn的三种部署方式及使用说明
  • 特点:需要事先申请资源,使用Flink中的yarn-session(yarn客户端),启动JobManager和TaskManger,main 方法在客户端执行。
  • 优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率;
  • 缺点:
    • 作业执行完成以后,资源不会被释放,因此一直会占用系统资源
    • 所有作业共享集群资源,隔离性差,JM 负载瓶颈
  • 应用场景:适合作业递交比较频繁的场景,小作业比较多的场景

Per-Job模式

考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个任务启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。

一文详解Flink on Yarn的三种部署方式及使用说明
一文详解Flink on Yarn的三种部署方式及使用说明
  • 特点:每次递交作业都需要申请一次资源,main 方法在客户端执行。
  • 优点:
    • 作业运行完成,资源会立刻被释放,不会一直占用系统资源
    • 每个作业单独启动集群,隔离性好,JM 负载均衡
  • 缺点:
    • 每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
  • 应用场景:适合作业比较少的场景、大作业的场景

application模式

3.1 背景

flink-1.11 引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。

3.2 优势

Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。

Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。

通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

因此,社区提出新的部署方式 Application 模式解决该问题。

3.3 原理

Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。

Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。

在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。

Flink on Yarn的三种部署方式使用说明

1. 第一种方式:YARN session

  • yarn-session.sh(开辟资源)+flink run(提交任务)

这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和Task-managers,然后可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)

  • 通过./bin/yarn-session.sh脚本启动YARN Session
  • 脚本可以携带的参数:
    • -n(--container):TaskManager的数量。(1.10 已经废弃)
    • -s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
    • -jm:JobManager的内存(单位MB)。
    • -q:显示可用的YARN资源(内存,内核);
    • -tm:每个TaskManager容器的内存(默认值:MB)
    • -nm:yarn 的appName(现在yarn的ui上的名字)。
    • -d:后台执行。
  • 注意:
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或--detached。
  • 确定TaskManager数:

Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager。

2. 启动:

bin/yarn-session.sh -tm 1024  -s 4 -d           

上面的命令的意思是,每个 TaskManager 拥有4个 Task Slot(-s 4),并且被创建的每个 TaskManager 所在的YARN Container 申请 1024M 的内存,同时额外申请一个Container用以运行ApplicationMaster以及Job Manager。

  • 执行:
bin/flink run -p 8 examples/batch/WordCount.jar           
  • TM的数量取决于并行度,如下图:
一文详解Flink on Yarn的三种部署方式及使用说明

3. 启动成功之后,控制台显示:

一文详解Flink on Yarn的三种部署方式及使用说明

4. yarn页面

去yarn页面:ip:8088可以查看当前提交的flink session

一文详解Flink on Yarn的三种部署方式及使用说明

5. 使用flink提交任务

bin/flink run examples/batch/WordCount.jar           

在控制台中可以看到wordCount.jar计算出来的任务结果

一文详解Flink on Yarn的三种部署方式及使用说明

6. 查看当前提交的任务

在yarn-session.sh提交后的任务页面中也可以观察到当前提交的任务:

一文详解Flink on Yarn的三种部署方式及使用说明

7. 点击查看任务细节:

一文详解Flink on Yarn的三种部署方式及使用说明

8. 停止当前任务

yarn application -kill  application_1527077715040_0007           

2 第二种方式:在YARN上运行一个Flink作业

上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

使用flink直接提交任务

bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar           

常用参数:

  • --p 程序默认并行度

下面的参数仅可用于 -m yarn-cluster 模式

  • --yjm JobManager可用内存,单位兆
  • --ynm YARN程序的名称
  • --yq 查询YARN可用的资源
  • --yqu 指定YARN队列是哪一个
  • --ys 每个TM会有多少个Slot
  • --ytm 每个TM所在的Container可申请多少内存,单位兆
  • --yD 动态指定Flink参数
  • --yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)

在8088页面观察:

一文详解Flink on Yarn的三种部署方式及使用说明

停止yarn-cluster

yarn application -kill application的ID           

注意:

在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml;

可以通过:-yD <arg> Dynamic properties

来覆盖原有的配置信息:比如:

bin/flink run -m yarn-cluster -yD fs.overwrite-files=true examples/batch/WordCount.jar
-yD fs.overwrite-files=true -yD taskmanager.network.numberOfBuffers=16368           

注意

如果使用的是flink on yarn方式,想切换回standalone模式的话,需要删除文件:【/tmp/.yarn-properties-root】

因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager

如果是分离模式运行的YARN JOB后,其运行完成会自动删除这个文件

但是会话模式的话,如果是kill掉任务,不会执行自动删除这个文件的步骤,所以需要手动删除这个文件。