【Kettle】Kettle增量抽取模型
在日常的ETL過程中,常會使用用增量抽取資料,有一個簡單,通用的增量抽取模型,便可以開發效率,并統一開發規範,該模型是基于時間戳的增量方式,并且有etl_logs記錄抽取情況,并且具有一定的容錯性。
一,抽取日志表
表字段
Id:表ID
tableName:表名
min_date:最小更新時間,初始值 1980-01-01 00:00:00
max_date:最大更新時間,初始值 1980-01-01 00:00:00
status:更新狀态,0-初始化,1-更新中,2-錯誤,3-完成
etl_date:etl開始時間
etl_date_end:etl結束時間
is_del:删除标記,0-正常,1-删除
threads:執行sql的程序數,0-沒執行sql,1-正在執行etl的sql
sql_text:擷取最大更新時間的sql
注意:
如果表中用于增量更新的字段為update_date,則sql_text為表名。
如果表中的增量字段不為update_date,則sql_text為(select 增量字段 as update_date from tablename) t)
二,抽取過程
1,主流程
save_etl_logs:将etl_logs将日志表中的資料插入到日志曆史記錄表中
sql_init_etl:整個etl同步的初始化,初始化對象:status 為 0 或 3,且is_del = 0
更新内容如下sql。
update etl_logs
set min_date = max_date,
etl_date = sysdate,
etl_date_end = null
where status in ('0','3') and is_del = 0
2,get_etl_maxdate_job
該job主要是為了擷取最大的更新時間,其關鍵就在于etl_logs中的sql_text
主流程
轉換:get_maxdate_from_logs 主要是從日志表中擷取ID,SQL_TEXT,用于擷取最大的更新時間。
SELECT id, sql_text
FROM etl_logs
WHERE status in('0','2','3')
作業:get_maxdate_job,通過sql_text擷取最大更新時間,并更新到日志表中。
轉換:get_variables,從結果集中擷取ID,sql_text,并設定變量
轉換:get_maxdate_tran,更新最大更新時間
中間的表輸入sql,注意勾選替換變量
select ${id},as xid
nvl(max(update_date).to_date('1980-01-01 00:00:00','yyyy-mm-dd HH24:mi:ss')) as maxdate,
'1' as status
from ${sql_text}
3,etl_main_job
用于實作業務資料的抽取更新。
主流程
(1),設定表id。
(2),擷取目前目前執行sql的程序,如果大于等于1,則有SQL在執行中,如果小于1,則thread+1,開始執行表插入更新。
(3),判斷插入/更新是否完成,并更新執行的狀态。
轉換:get_threads,擷取執行SQL的thread
SQL_update_thread
update etl_logs
set thread = thread+1
where id = ${id}
轉換:tran_xxxx_update,表的插入/更新
注意表輸入的條件,增量字段的時間範圍
後面更新結果的SQL,3-成功,2-失敗,并且更新thread,以及接受時間。
update etl_logs
set status = '3',
etl_date_end = sysdate,
thread = thread-1
where id = ${id}
整個模型就是這樣,由于涉及的過程比較多,剛開始搭建可能會報錯,是以需要自己在根據實際情況調整。如有不對,可以拍磚指出。