天天看点

Airflow 在即刻的演进

作者:闪念基因

在介绍 Airflow 之前,先引入两个概念:定时系统和工作流系统。一个定时系统,比如下面的 crontab,表示每天早上 6 点下载一个文件到本地:

0 6 * * * sh /var/download_file.sh
           

如果希望下载完成后分析该文件,大概只能再创建一个 6 点 30 的 crontab:

30 6 * * * sh /var/analyze_file.sh
           

这种通过时间先后顺序建立的依赖关系,是不稳定的。正常情况下 6 点开始下载,7 点分析,问题在于:如果下载时间过长,分析任务开始时文件还没有 ready;如果下载失败,分析任务由于不能获取下载任务的执行结果,依然会尝试启动,而实际上是没有必要的。

这种任务之间有上下游依赖关系的场景下,工作流系统的任务编排能力就显得非常有用了。

Airflow 是一个工作流系统,由 Airbnb 贡献给 Apache,本文将介绍 Airflow 的一些基本概念以及在即刻的使用经验。

Airflow

基本概念

关于 Airflow 的调度机制,可以总结为一句话:

如果一个 dag run 的状态为 running,调度器就会按照拓扑排序,开始执行状态为空的任务;如果某个任务失败,其下游所有任务都不会执行

Operator 和 Task

Operator 是任务模板,用来定义一类任务,比如 Airflow 默认提供了 BashOperator、PythonOperator。Task 是任务,是 Operator 的参数化实例。

DAG

DAG(有向无环图)是 Airflow 的核心概念,主要由一个 cron 表达式和一个编排过的 Task 集合组成,集合中任务之间的依赖关系可以表示为一个有向无环图。

Dag Run 和 Task Instance

Dag Run 是 Dag 的一次执行,每到达 cron 表达式的一个执行点,会触发一次 Dag Run,以拓扑排序的顺序开始执行该 Dag 里的所有 Task。Task Instance 是 Task 的一次执行。

核心组件

  1. Airflow-scheduler 调度器
  2. Airflow-worker 执行器
  3. Airflow-webserver 管理界面

动手实践

Airflow 使用 python 开发,可通过pip install apache-airflow安装,下面看看如何用 Airflow 来解决最开始的文件下载分析问题。为了演示,我把分析任务拆分成两个,并且在分析完之后增加一个邮件通知,最后再删除该文件,代码如下:

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

args = {
    'owner': 'Damien',
    'start_date': airflow.utils.dates.days_ago(1),
}

dag = DAG(
    dag_id='download_and_analyze_file',
    default_args=args,
    schedule_interval='0 6 * * *',
)

download_file = BashOperator(
    task_id='download_file',
    bash_command="sh /var/download.sh {{ ds }}",
    dag=dag,
)

analyze_1 = BashOperator(
    task_id='analyze_1',
    bash_command="sh /var/analyze.sh {{ task_instance.xcom_pull(task_ids='download_file', key='return_value') }}",
    dag=dag,
)

analyze_2 = BashOperator(
    task_id='analyze_2',
    bash_command="sh /var/analyze.sh {{ task_instance.xcom_pull(task_ids='download_file', key='return_value') }}",
    dag=dag,
)

send_email = BashOperator(
    task_id='send_email',
    bash_command="sh /var/analyze.sh",
    dag=dag,
)

delete_file = BashOperator(
    task_id='delete_file',
    bash_command="rm {{ task_instance.xcom_pull(task_ids='download_file', key='return_value') }}",
    dag=dag,
)

download_file >> [analyze_1, analyze_2] >> send_email >> delete_file
           

将上面的代码文件拷贝到指定目录,Airflow 定期扫描该目录,并加载执行。在这个例子中,首先定义了一个 DAG 对象,定时为每天早上 6 点开始执行,并向其中添加了 5 个任务 download_file、analyze_1、analyze_2、send_email 、delete_file,通过位移操作符>>来设置任务间的上下游关系。

动态传入任务参数

Operator 的参数可以使用 jinja2 模板语法,在执行时动态渲染,比如bash_command="sh /var/download.sh {{ ds }}",其中ds是 Airflow 内置的变量,表示当天日期,格式为yyyy-MM-dd,Airflow 也支持扩展更多的变量和函数。

在任务之间传递消息

Airflow 通过 XCom 在任务间交换消息,包括 xcom_push 和 xcom_pull,上例中的 analyze_1 和 analyze_2 便是通过 xcom_pull 来获取 download_file 任务返回的文件路径。

Airflow webserver 功能介绍

webserver 是 Airflow 的管理后台,定义完 DAG 后,可以在 webserver 上看到拓扑图:

Airflow 在即刻的演进

一段时间后,DAG 的执行结果如下:

Airflow 在即刻的演进

上图中纵向的每一列代表一次 dag run,每个小方形代表一个 task instance,绿色表示 dag run 或 task instance 的状态为执行成功,红色表示执行失败,橙色表示上游失败,空白表示任务尚未执行。在 Airflow webserver 上重要的任务操作有:

clear

clear 是最常用的操作,当任务失败时,用 clear 可以清空指定任务及其下游所有任务的状态,从而达到重跑的目的。

run

run 用来执行单个任务,只在 CeleryExecutor 下可用。

mark success

在少数情况下,修复任务的操作发生在 Airflow 之外,即该任务已经正常输出结果,下游任务是可以正常执行的,此时应当将该任务标记为 success,并 clear 重跑下游任务。

实际上,考虑到权限管控,webserver 的操作应该对数据平台的用户透明,平台应基于 Airflow 提供的操作原语,封装更易用、受控的任务操作方式。

Airflow 企业级应用

一般在数据平台中,有任务管理平台(Job-registry),有调度引擎(Airflow),有计算引擎(EMR),关系如下:

Airflow 在即刻的演进

上图中,spark 是 EMR 集群上的核心软件,我们真正要执行的任务,几乎都是 spark 任务。livy 是 EMR 提供的 spark 管控工具。airflow 通过 livy 的 rest api 来提交任务、查询任务状态。在我们 Airflow 的使用过程中,总结了以下经验:

build dags dynamicly

在开始的示例代码中,简单声明了一个 dag 和 5 个 task,构建 DAG 比较轻松。而现实场景中的任务会更多、依赖关系更复杂,一般做法是先从任务管理平台拉取全部任务信息,然后动态地、递归地构建 DAGs。

adder and sensor 模型

首先定义 LivySparkAdder 和 LivySparkSensor 两种 operator,分别继承 Airflow 的 BaseOperator 类和 BaseSensor 类。对于每个 spark 任务,相应地生成 1 个 adder、1 个 sensor,其中 adder 负责向 livy 提交任务,sensor 负责轮询任务状态,sensor 依赖 adder。这种方式类似于异步 io,可以大幅度提高调度系统的吞吐量。

start_date 的技巧

Dag 的 start_date 参数表示开始执行时间,当新建一个 dag 时,scheduler 会从这个时间开始,依次触发 dag run。所以这个参数一定不能写死,比如设成2019-08-01,如果有用户在 2019 年 9 月 1 日新加了一个每天调度的 dag,那么 scheduler 会往前追加 30 次 dag run,进而造成一个任务瞬间被提交 30 次。标准做法应该是根据设置的 cron 表达式,即 DAG 的 schedule_interval 参数,动态确定 start_date,不同 dag 的 start_date 应该是不同的,这里常有两种语义可供选择:

  • 立即执行一次
  • 等待下一次调度点开始执行

executor 执行方式

executor 参数控制执行方式,常用的有 LocalExecutor 和 CeleryExecutor:当选择 LocalExecutor 时,worker 以多进程的方式执行任务,系统并发度受限于机器的 cpu;当选择 CeleryExecutor 时,scheduler 将任务发送至 redis,分布式的 celery worker 从 redis 中获取任务并执行,最好采用这种可水平扩展的方式。

集成 rest api

Rest api 比 airflow cli 更好用,方便从其他系统操作任务,比如执行、重跑等,比如这个Airflow Rest Api[1],基于 airflow 的 plugin 机制实现。

监控与报警

我们做了 3 种监控:

  • task 级监控 :当单个任务执行失败时(通常是 sensor),给任务负责人发送报警
  • dag 级监控:给 dag 设置 sla 报警,比如核心的日常离线任务 dag,要求在早上八点前全部跑完
  • system 级监控 :为防止整个 airflow 调度器挂掉(stop the world),我们使用了Airflow Prometheus Exporter[2],来实时监控系统状态,确保 scheduler 正常工作

流控

当使用 CeleryExecutor 时,调度系统自身的并行度非常高,但下游的抗并发能力不一定能匹配,此时需要进行流控(flow control)。

在凌晨的日常任务中,有非常多的数据同步任务相互之间没有任何依赖,一到调度时间点,数百个任务同时开始提交,给下游的 livy server 造成很大压力。于是我们将所有 adder 绑定到同一个 slot 为 4 的 pool 上,限制 adder 的并行度。需要注意的是,在这种 adder - > sernsor 提交模型中,因为 adder 任务执行时间极短,限制 adder 并行度并不会影响系统的吞吐量。

跨 DAG 依赖

跨 DAG 依赖是指 dag1 中的任务 a 依赖 dag2 中的 1 个任务 b,通过ExternalTaskSensor实现,当然这个实现对用户是透明的。下面举两个例子:

  • 假设早上 8 点要安排一次推送,但是计算过程很复杂而且耗时比较长,这时可以计算任务的编排放在 6 点的 dag1 中,推送任务放在 8 点的 dag2 中,这样可以做到 8 点准时推送。
  • 另一个例子是用跨 DAG 依赖来规避底层组件的缺陷,spark 对 S3 目录的并发读写是没有事务特性的。假设有个 hourly 任务写 S3,有个 daily 任务凌晨读 S3 同一目录做快照,在凌晨是很可能产生脏读,而通过让 daily 任务依赖 hourly 任务,可以做到串行化访问 S3 目录。

另外,这种依赖是可以指定依赖偏移,说简单点就是,一个每天凌晨 1 点执行的任务,既支持依赖 0 点的小时级任务,也支持依赖 1 点的小时级任务。

ETL 调度系统

如果说 Airflow 有什么细节令人费解,那一定是 execution_date,也就是任务参数中的{{ ds }}。假设有个任务在凌晨 1 点执行,按对一般调度系统的理解,任务执行时的 execution_date 应该是当天凌晨 1 点,实际上却是前一天凌晨 1 点。在传统的定时 ETL 方案中,只有到达当前调度点,从上一个调度点到当前调度点的全量数据才能 ready,这批数据的归属,当然是上一个调度点。

下面说说 Airflow 的部署方案。

Airflow 单机方案

在数据平台发展初期,我们在 ec2 上用puckel/docker-airflow[3]部署了单机版的 airflow,将 airflow-scheduler、airflow-worker、airflow-webserver 集成在一个 container 中,使用 LocalExecutor。这种方式下不详细介绍,主要有几个痛点:

  • LocalExecutor 性能瓶颈十分明显,扩展性不好。
  • 上线新代码需要登录到机器上,pull 最新代码,容易出错,多人同时操作还会产生冲突。
  • 当 DAG 变得庞大时,webserver 因内存有限出现问题,但无法很好地扩容。
  • 如果误终止了机器,整个调度系统就没了(还真出现过一次机器被误删的故障)。

最终我们选择了分布式部署方案。

Airflow 分布式方案

典型的 airflow on k8s 架构,如下图:

Airflow 在即刻的演进

系统中有多个 webserver、多个 scheduler(只有 1 个 scheduler active,剩下的 standby)、多个 worker(可手动扩容),三个组件共用同一份 config map 配置,共用同一个元数据存储,scheduler 和 worker 通过 redis 生产和消费任务。

相比单机版,分布式版 Airflow 主要改动包括:

  • scheduler、worker、webserver 分离
  • 增加 standby scheduler 保证高可用
  • 使用 celery executor
  • 使用外部 rds 存储元数据
  • 增加持续集成
  • 增加 beta 环境

镜像构建与部署

  1. 构建基础镜像 airflow-base
FROM puckel/docker-airflow:1.8.2
COPY dags /usr/local/airflow/dags
COPY plugins /usr/local/airflow/plugins
COPY requirements.txt /requirements.txt
           
  1. 构建 airflow-scheduler 镜像
FROM airflow-base
CMD ["scheduler"]
           
  1. 构建 airflow-worker 镜像
FROM airflow-base
CMD ["worker"]
           
  1. 构建 airflow-webserver 镜像
FROM airflow-base
CMD ["webserver"]
           
  1. 在 k8s 上创建 deployments
  • webserver 先部署,进行数据库初始化操作。
  • scheduler 只能存在一个 active 实例,不能设置 rolling update 策略,防止竞争。
  • worker 根据系统负载来调整数量。

最终效果

改用这种分布式方案后,webserver 和 worker 具备了水平扩展能力,scheduler 变得高可用,同时,自动部署和测试环境减少了手动操作失误的风险,提高了系统的稳定性。

此时 Airflow 算是一个合格的调度系统。

参考资料

[1]

Airflow Rest Api: https://github.com/teamclairvoyant/airflow-rest-api-plugin

[2]

Airflow Prometheus Exporter: https://github.com/epoch8/airflow-exporter

[3]

puckel/docker-airflow: https://github.com/puckel/docker-airflow

作者:Damien

来源-微信公众号:即刻技术团队

出处:https://mp.weixin.qq.com/s/3gROml9RVqLJy9BdgtyqJA