天天看點

使用Airflow來排程Data Lake Analytics的任務

今天我們來介紹一下使用Airflow來排程 Data Lake Analytics

(後面簡稱DLA)的任務執行。DLA作為一個資料湖的解決方案,

客戶有每天周期性的排程一些任務從DLA查詢資料回流到業務系統的需求。因為DLA相容

MySQL的協定,是以所有支援MySQL的協定的排程架構都天然支援DLA,今天就來介紹一下使用業界著名的

Apache Airflow

來排程DLA的作業。

大緻步驟如下:

  1. 購買一個ECS用來運作Airflow
  2. 安裝Airflow
  3. 添加DLA的DB Connection
  4. 開發任務腳本

購買ECS并進行配置

購買ECS的詳細流程這裡就不一一羅列了,非常的簡單,按照

官方的購買流程

可以分分鐘完成,需要注意的幾點這裡說一下:

  • 購買的ECS的Region要和你的資料所在Region(其實也就是你開通DLA的 Region 保持一緻)。
  • 購買的ECS需要開通外網通路權限,因為Airflow的一些網頁控制台需要通過外網來通路。
  • ECS購買好之後記得在安全組裡面放行入方向的80端口,因為下面要安裝的Airflow有web頁面,我們需要通過80端口進行通路,如下圖:
使用Airflow來排程Data Lake Analytics的任務

同時記錄下這個ECS的外網位址:

使用Airflow來排程Data Lake Analytics的任務

Airflow是一個Python寫的軟體,是以我們是通過Python的Package Manager:pip來安裝的,因為我們要使用MySQL(而不是預設的SQLite) 來作為Airflow的中繼資料庫, 是以我們還要安裝MySQL相關的包:

# 安裝Airflow本身
sudo pip install apache-airflow[mysql]

# 安裝MySQL相關的依賴
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python           

預設安裝的MySQL有一個配置需要調整:

# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1           

修改完成之後重新開機MySQL:

root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.           

Airflow 安裝完成之後會在你的本地使用者目錄下産生

~/airflow

目錄, 它裡面的内容大緻如下:

root@hello:~/airflow# ll
total 4168
drwxr-xr-x  4 root root    4096 Oct 19 10:40 ./
drwx------ 10 root root    4096 Oct 19 10:40 ../
-rw-r--r--  1 root root   11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x  2 root root    4096 Oct 18 19:32 dags/
drwxr-xr-x  6 root root    4096 Oct 18 17:52 logs/
-rw-r--r--  1 root root    1509 Oct 18 11:38 unittests.cfg           

其中

airflow.cfg

是 Airflow叢集的配置檔案,各種配置都是在這裡改的,

dags

目錄儲存我們寫的任務,後面我們要寫的任務都是放在這個檔案夾裡面。

初始化Airflow中繼資料庫

前面我們已經安裝了 MySQL 資料庫,現在我們來建立一個資料庫給Airflow來儲存中繼資料:

$ mysql \
    -uroot \
    -proot \
    -e "CREATE DATABASE airflow
        DEFAULT CHARACTER SET utf8
        DEFAULT COLLATE utf8_general_ci;

        GRANT ALL PRIVILEGES
        ON airflow.*
        TO 'airflow'@'localhost'
        IDENTIFIED BY 'airflow';

        FLUSH PRIVILEGES;"
        
$ airflow initdb           

到之類為止,中繼資料庫就初始化好了。

安裝 Dask

Airflow本身是一個排程工具,任務的具體執行是交給一個叫做Executor的概念來做的,預設配置的executor是

SequentialExecutor

, 不适合生産環境使用,分布式的Executor有

Celery

Dask

, 但是筆者嘗試過

Celery

之後發現坑有點多,這裡推薦使用 Dask:

安裝Dask:

pip install dask           

運作 dask scheduler:

# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786

dask-scheduler --host $DASK_HOST --port $DASK_PORT           

運作 dask worker:

dask-worker $DASK_HOST:$DASK_PORT           

配置 airflow.cfg

因為使用的不是預設的配置:我們選擇了使用MySQL來作為中繼資料庫,使用Dask來執行任務,是以需要對配置檔案:

~/airflow/airflow.cfg

進行修改:

[core]
# 使用Dask來運作任務
executor = DaskExecutor
# 中繼資料庫的連接配接方式
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

[dask]
# Dask的排程位址
cluster_address = 127.0.0.1:8786           

啟動

到這裡位置所有準備工作做完了,我們可以啟動Airflow了,我們需要啟動 Airflow 的三個子產品:

webserver: 用來承載Airflow的管理控制頁面:

airflow webserver -p 80 -D           

scheduler: 任務排程器, 它會監控

~/airflow/dags

下面我們定義的任務檔案的變化,這樣我們才能通過管理控制台及時看到我們新開發的任務:

airflow scheduler -D           

worker: 跟Dask進行互動真正執行任務的:

airflow worker -D           

如果一切順利的話,一個Airflow的叢集就已經Ready了,可以在上面執行任務了。預設安裝裡面已經一些示例的任務, 浏覽器裡面輸入

http://<你ECS的外網IP>

就可以看到Airflow的控制頁面了:

使用Airflow來排程Data Lake Analytics的任務

開發我們自己的任務

我們的目的是要用Airflow來排程DLA的任務,首先我們要添加一個連接配接串, Airflow裡面通過Connection來儲存連接配接串的具體資訊, 打開頁面:

http://<你ECS的外網IP>/admin/connection/

你會看到如下的頁面:

使用Airflow來排程Data Lake Analytics的任務

我們添加一下DLA的連接配接資訊:

使用Airflow來排程Data Lake Analytics的任務

這裡比較重要的兩個點:

  1. 連接配接類型選擇: MySQL (DLA相容MySQL的協定)
  2. Conn Id很關鍵,後面我們任務裡面是通過這個Conn Id來通路資料源的。

開發我們的任務代碼

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'dlademo', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='echo hello-airflow',
    dag=dag)

def step2(ds, **kargs):
    mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
    for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
        print items

t2 = PythonOperator(
    task_id='execute_dla_sql',
    provide_context=True,
    python_callable=step2,
    dag=dag)

t2.set_upstream(t1)           

這個任務裡面定義了一個DAG, 一個DAG表示一個任務流程,一個流程裡面會執行有依賴關系的多個任務,DAG的第一個參數是DAG的名字, 這裡我們叫

dlademo

,它的第三個參數是排程的周期,這裡是每天排程一次:

timedelta(1)

第一個任務是執行一個bash指令:

echo hello-airflow

, 第二個任務則是我們的SQL任務,這裡寫的比較簡單,通過SQL把DLA資料庫裡面的一張表查詢并列印出來,最後

t2.set_upstream(t1)

設定兩個任務之間的依賴關系。

現在我們打開

http://<你的ECS公網IP>/admin/airflow/tree?dag_id=dlademo

就可以看到這個任務的詳情了:

使用Airflow來排程Data Lake Analytics的任務

在這個圖中我們可以看到我們定義的兩個任務,以及它們之間的依賴關系。Airflow的功能非常的豐富,更多的功能就留給大家自己去體驗了。

總結

Airflow是Apache的頂級項目,從項目的成熟度和功能的豐富度來說都很不錯,入門也很簡單,很容易就可以搭建自己的叢集,并且它有自己的Connection機制,使得我們不需要把資料庫的使用者名密碼暴露在任務腳本裡面,使用DLA的同學們可以試試Airflow來排程自己的任務。

參考資料