今天我們來介紹一下使用Airflow來排程 Data Lake Analytics
(後面簡稱DLA)的任務執行。DLA作為一個資料湖的解決方案,
客戶有每天周期性的排程一些任務從DLA查詢資料回流到業務系統的需求。因為DLA相容
MySQL的協定,是以所有支援MySQL的協定的排程架構都天然支援DLA,今天就來介紹一下使用業界著名的
Apache Airflow來排程DLA的作業。
大緻步驟如下:
- 購買一個ECS用來運作Airflow
- 安裝Airflow
- 添加DLA的DB Connection
- 開發任務腳本
購買ECS并進行配置
購買ECS的詳細流程這裡就不一一羅列了,非常的簡單,按照
官方的購買流程可以分分鐘完成,需要注意的幾點這裡說一下:
- 購買的ECS的Region要和你的資料所在Region(其實也就是你開通DLA的 Region 保持一緻)。
- 購買的ECS需要開通外網通路權限,因為Airflow的一些網頁控制台需要通過外網來通路。
- ECS購買好之後記得在安全組裡面放行入方向的80端口,因為下面要安裝的Airflow有web頁面,我們需要通過80端口進行通路,如下圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL5gDNmFTOwkTZmJjYkhjYyYmYyMDO2YjYjVWNhRzYxcjZ1YjNjRGMx8CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
同時記錄下這個ECS的外網位址:
Airflow是一個Python寫的軟體,是以我們是通過Python的Package Manager:pip來安裝的,因為我們要使用MySQL(而不是預設的SQLite) 來作為Airflow的中繼資料庫, 是以我們還要安裝MySQL相關的包:
# 安裝Airflow本身
sudo pip install apache-airflow[mysql]
# 安裝MySQL相關的依賴
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python
預設安裝的MySQL有一個配置需要調整:
# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1
修改完成之後重新開機MySQL:
root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.
Airflow 安裝完成之後會在你的本地使用者目錄下産生
~/airflow
目錄, 它裡面的内容大緻如下:
root@hello:~/airflow# ll
total 4168
drwxr-xr-x 4 root root 4096 Oct 19 10:40 ./
drwx------ 10 root root 4096 Oct 19 10:40 ../
-rw-r--r-- 1 root root 11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x 2 root root 4096 Oct 18 19:32 dags/
drwxr-xr-x 6 root root 4096 Oct 18 17:52 logs/
-rw-r--r-- 1 root root 1509 Oct 18 11:38 unittests.cfg
其中
airflow.cfg
是 Airflow叢集的配置檔案,各種配置都是在這裡改的,
dags
目錄儲存我們寫的任務,後面我們要寫的任務都是放在這個檔案夾裡面。
初始化Airflow中繼資料庫
前面我們已經安裝了 MySQL 資料庫,現在我們來建立一個資料庫給Airflow來儲存中繼資料:
$ mysql \
-uroot \
-proot \
-e "CREATE DATABASE airflow
DEFAULT CHARACTER SET utf8
DEFAULT COLLATE utf8_general_ci;
GRANT ALL PRIVILEGES
ON airflow.*
TO 'airflow'@'localhost'
IDENTIFIED BY 'airflow';
FLUSH PRIVILEGES;"
$ airflow initdb
到之類為止,中繼資料庫就初始化好了。
安裝 Dask
Airflow本身是一個排程工具,任務的具體執行是交給一個叫做Executor的概念來做的,預設配置的executor是
SequentialExecutor
, 不适合生産環境使用,分布式的Executor有
Celery
和
Dask
, 但是筆者嘗試過
Celery
之後發現坑有點多,這裡推薦使用 Dask:
安裝Dask:
pip install dask
運作 dask scheduler:
# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786
dask-scheduler --host $DASK_HOST --port $DASK_PORT
運作 dask worker:
dask-worker $DASK_HOST:$DASK_PORT
配置 airflow.cfg
因為使用的不是預設的配置:我們選擇了使用MySQL來作為中繼資料庫,使用Dask來執行任務,是以需要對配置檔案:
~/airflow/airflow.cfg
進行修改:
[core]
# 使用Dask來運作任務
executor = DaskExecutor
# 中繼資料庫的連接配接方式
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
[dask]
# Dask的排程位址
cluster_address = 127.0.0.1:8786
啟動
到這裡位置所有準備工作做完了,我們可以啟動Airflow了,我們需要啟動 Airflow 的三個子產品:
webserver: 用來承載Airflow的管理控制頁面:
airflow webserver -p 80 -D
scheduler: 任務排程器, 它會監控
~/airflow/dags
下面我們定義的任務檔案的變化,這樣我們才能通過管理控制台及時看到我們新開發的任務:
airflow scheduler -D
worker: 跟Dask進行互動真正執行任務的:
airflow worker -D
如果一切順利的話,一個Airflow的叢集就已經Ready了,可以在上面執行任務了。預設安裝裡面已經一些示例的任務, 浏覽器裡面輸入
http://<你ECS的外網IP>
就可以看到Airflow的控制頁面了:
開發我們自己的任務
我們的目的是要用Airflow來排程DLA的任務,首先我們要添加一個連接配接串, Airflow裡面通過Connection來儲存連接配接串的具體資訊, 打開頁面:
http://<你ECS的外網IP>/admin/connection/
你會看到如下的頁面:
我們添加一下DLA的連接配接資訊:
這裡比較重要的兩個點:
- 連接配接類型選擇: MySQL (DLA相容MySQL的協定)
- Conn Id很關鍵,後面我們任務裡面是通過這個Conn Id來通路資料源的。
開發我們的任務代碼
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'dlademo', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='echo hello-airflow',
dag=dag)
def step2(ds, **kargs):
mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
print items
t2 = PythonOperator(
task_id='execute_dla_sql',
provide_context=True,
python_callable=step2,
dag=dag)
t2.set_upstream(t1)
這個任務裡面定義了一個DAG, 一個DAG表示一個任務流程,一個流程裡面會執行有依賴關系的多個任務,DAG的第一個參數是DAG的名字, 這裡我們叫
dlademo
,它的第三個參數是排程的周期,這裡是每天排程一次:
timedelta(1)
。
第一個任務是執行一個bash指令:
echo hello-airflow
, 第二個任務則是我們的SQL任務,這裡寫的比較簡單,通過SQL把DLA資料庫裡面的一張表查詢并列印出來,最後
t2.set_upstream(t1)
設定兩個任務之間的依賴關系。
現在我們打開
http://<你的ECS公網IP>/admin/airflow/tree?dag_id=dlademo
就可以看到這個任務的詳情了:
在這個圖中我們可以看到我們定義的兩個任務,以及它們之間的依賴關系。Airflow的功能非常的豐富,更多的功能就留給大家自己去體驗了。
總結
Airflow是Apache的頂級項目,從項目的成熟度和功能的豐富度來說都很不錯,入門也很簡單,很容易就可以搭建自己的叢集,并且它有自己的Connection機制,使得我們不需要把資料庫的使用者名密碼暴露在任務腳本裡面,使用DLA的同學們可以試試Airflow來排程自己的任務。
參考資料
- Airflow安裝手冊: https://airflow.apache.org/installation.html
- Scaling out with Dask: https://airflow.readthedocs.io/en/stable/howto/executor/use-dask.html