airflow 1.10+安装
本次安装airflow版本为1.10+,其需要依赖python和db,本次选择的db为mysql。
本次安装组件及版本如下:airflow == 1.10.0
python == 3.6.5
mysql == 5.7
整体流程
建表
安装
配置
运行
配置任务
库名为airflow
'create database airflow;'
建用户
用户名为airflow,并且设置所有ip均可以访问。
create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';
用户授权
这里为新建的airflow用户授予airflow库的所有权限
这里通过 virtualenv 进行安装。
----- 通过virtualenv安装
$ mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 创建目录
$ virtualenv --no-site-packages airflow --python=python # 创建虚拟环境
$ source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境
----- 安装指定版本或者默认
$ pip install apache-airflow -i https://pypi.douban.com/simple
在安装完一堆的依赖后,就需要配置 airflow_home 环境变量,后续的 dag 和 plugin 都将以该目录作为根目录查找,如上,可以直接设置为 /tmp/project 。
报错
error: flask 1.1.1 has requirement jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible.
error: flask 1.1.1 has requirement werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible.
执行:pip3 install -u flask1.0.4
执行:pip3 install -u pika0.13.1
重新执行 :pip install apache-airflow -i https://pypi.douban.com/simple
----- 设置环境变量
(airflow) $ export airflow_home=/tmp/airflow
----- 查看其版本信息
(airflow) $ airflow version
____ |( )_______ / /________ __
____ /| |_ /__ / / __ / __ _ | /| / /
___ ___ | / _ / _ / _ / / // / |/ |/ /
// |// // // // _/____/|__/
v1.8.0
执行了上述的命令后,会生成 airflow.cfg 和 unittests.cfg 两个文件,其中前者是一个配置文件 。
----- 修改airflow db配置
pip install "apache-airflow[mysql]"
这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。
修改airflow db配置
修改${airflow_home}/airflow.cfg
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
参数的格式为mysql://帐号:密码@ip:port/db
初始化db
新建airflow依赖的表。
本文采用的用户认证方式为password方式,其他方式如ldap同样支持但是本文不会介绍。笔者在安装时实验过ldap方式但是未成功过。
此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下:
设置executor
修改:airflow.cfg
在 airflow 中,每个节点都是一个任务,可以是一条命令行 (bashoperator),可以是一段 python 脚本 (pythonoperator) 等等,然后这些节点根据依赖关系构成了一条流程,一个图,称为一个 dag 。
默认会到 ${airflow_home}/dags 目录下查找,可以直接在该目录下创建相应的文件。
如下是一个简单的示例。
该文件创建一个简单的 dag,只有三个运算符,两个 baseoperator ,也就是执行 bash 命令分别打印日期以及休眠 5 秒;另一个为 pythonoperator 在执行任务时调用 print_hello() 函数。
文件创建好后,放置到 ${airflow_home}/dags,airflow 自动读取该dag。
----- 测试是否正常,如果无报错那么就说明正常
$ python /tmp/project/dags/hello_world.py