天天看點

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

前言

之前我在兩篇SQLparse的開源庫解析中就說過自己在尋找在python程式設計内可行的SQL血緣解析,JAVA去解析Hive的源碼實踐的話我還是打算放到後期來做,先把Python能夠實作的先實作完。主要是HiveSQL的底層就是JAVA代碼,怎麼改寫還是繞不開JAVA的。不過上篇系列我有提到過sqlparse,其實這個庫用來解析血緣的話也不是不可以,但是能夠實作的功能是有限的,目前我實驗還行,一些較為複雜的SQL也能解析得出,算是成功達到可部署服務的水準了,但是根據SQL格式來比對的話肯定是有些SQL格式不能完全比對成功的,如果大家有需要血緣分析的SQL可以再次驗證一下。

算是填完了之前的部分坑,目前的開發進度已經可以将SQL的表血緣分析追蹤實作了,實作字段血緣的功能開發等後續将陸續上線。

一、主線任務

首先來對該項目的目标來分析一下,說到SQL血緣分析,這偏向于資料治理。

1.資料治理

資料治理(Data Governance)是組織中涉及資料使用的一整套管理行為。資料要産生價值,需要一個合理的“業務目标”,資料治理的所有活動應該圍繞真實的業務目标而開展,建立資料标準、提升資料品質隻是手段,而不是目标。

資料治理的本質是管理資料,是以需要加強中繼資料管理和主資料管理,從源頭治理資料,補齊資料的相關屬性和資訊,比如:中繼資料、品質、安全、業務邏輯、血緣等,通過中繼資料驅動的方式管理資料生産、加工和使用。

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

資料治理

資料的品質直接影響着資料的價值,并且直接影響着資料分析的結果以及我們以此做出的決策的品質。資料模型血緣與任務排程的一緻性是監管一體化的關鍵,有助于解決資料管理與資料生産口徑不一緻的問題,避免出現雙重管理不一緻的低效管理模式。

2.血緣追蹤

資料被業務場景使用時,發現資料錯誤,資料治理團隊需要快速定位資料來源,修複資料錯誤。那麼資料治理團隊需要知道業務團隊的資料來自于哪個核心庫,核心庫的資料又來自于哪個資料源頭。我們的實踐是在中繼資料和資料資源清單之間建立關聯關系,且業務團隊使用的資料項由中繼資料組合配置而來,這樣,就建立了資料使用場景與資料源頭之間的血緣關系。 資料資源目錄:資料資源目錄一般應用于資料共享的場景,例如政府部門之間的資料共享,資料資源目錄是基于業務場景和行業規範而建立,同時依托于中繼資料和基礎庫主題而實作自動化的資料申請和使用。

也就是為什麼我們需要解析SQL,追蹤建表索引或者引用解析。

3.SQL表血緣

那麼其中最重要的就是關于各個資料庫之間的資料關系了,關于建表以及插入更新操作都會使資料發生一定的改變,那麼這些操作就一定是被允許的?就像原來在網上看到的某某公司程式員删庫跑路,或者是一不小心删錯資料導緻耽誤産線等等。為了防止以上事故的出現必定要為此操作上一層保險,為每個成員設定資料操作權限。這樣以來送出的SQL語句就多了一層判斷。

二、實作過程

1.目标效果

首先明白一點我們要做出的東西需要呈現一個怎樣的形式,其中位于行業前排的無疑是SQLFlow:

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

SQLFlow

當第一次看到此圖我就決定血緣追蹤就應該是這個樣子,能夠清晰地解析出每個字段和表之間的血緣關系。以此我們設定輸出的基準,我們要做的項目目标就是如此。

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

SQL血緣解析

2.代碼實作

1.功能函數識别

該功能也是必須要實作的功能,我們需要明白這個SQL主要是幹什麼事情的。如果是插入INSERT或者是CREATE就有血緣分析的必要,如果是SELECT的話那麼做簡單的SQL解析即可。有了研究sqlparse源碼的成果我們調用相應的函數即可:

sql="select * from table1;insert into table select a,b,c from table2"

if __name__ == '__main__':

table_names=[]

#sql=get_sqlstr('read_sql.txt')

stmt_tuple=analysis_statements(sql)

for each_stmt in stmt_tuple:

type_name=get_main_functionsql(each_stmt)

print(type_name)

輸出:

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

功能

那麼對于SELECT我們就SQL涉及到的表追溯即可:

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

血緣

對于CREATE和INSERT的做血緣即可:

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

血緣

2.SQL标準格式

對于傳入的SQL我們首先要讓這條語句符合标準的SQL語句格式,這樣對于傳輸格式保持一緻,相容很有作用。一般我們都是通過文本來讀入。故需要讀取文本做處理:

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

處理後:

if __name__ == '__main__':

sql=get_sqlstr('read_sql.txt')

print(sql)

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

3.解析AST樹

得到的SQL無論是ANTRL還是SQLPARSE都是解析為一棵樹的形式進行遞歸回溯。最終都要解析生産的SQL樹:

sql="select * from table1;insert into table3 select a,b,c from table2"

if __name__ == '__main__':

#sql=get_sqlstr('read_sql.txt')

stmt_tuple=analysis_statements(sql)

for each_stmt in stmt_tuple:

table_names=[]

type_name=get_main_functionsql(each_stmt)

get_ASTTree(each_stmt)

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

4.最終效果:

SQL:

select

b.product_name "産品",

count(a.order_id) "訂單量",

b.selling_price_max "銷售價",

b.gross_profit_rate_max/100 "毛利率",

case when b.business_type =1 then '自營消化' when b.business_type =2 then '服務商消化' end "消化模式"

from(select 'CRM簽單' label,date(d.update_ymd) close_ymd,c.product_name,c.product_id,

a.order_id,cast(a.recipient_amount as double) amt,d.cost

from mysql4.dataview_fenxiao.fx_order a

left join mysql4.dataview_fenxiao.fx_order_task b on a.order_id = b.order_id

left join mysql7.dataview_trade.ddc_product_info c on cast(c.product_id as varchar) = a.product_ids and c.snapshot_version = 'SELLING'

inner join (select t1.par_order_id,max(t1.update_ymd) update_ymd,

sum(case when t4.product2_type = 1 and t5.shop_id is not null then t5.price else t1.order_hosted_price end) cost

from hive.bdc_dwd.dw_mk_order t1

left join hive.bdc_dwd.dw_mk_order_status t2 on t1.order_id = t2.order_id and t2.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

left join mysql7.dataview_trade.mk_order_merchant t3 on t1.order_id = t3.order_id

left join mysql7.dataview_trade.ddc_product_info t4 on t4.product_id = t3.MERCHANT_ID and t4.snapshot_version = 'SELLING'

left join mysql4.dataview_scrm.sc_tprc_product_info t5 on t5.product_id = t4.product_id and t5.shop_id = t1.seller_id

where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

and t2.valid_state in (100,200) ------有效訂單

and t1.order_mode = 10 --------産品消耗訂單

and t2.complete_state = 1 -----訂單已經完成

group by t1.par_order_id

) d on d.par_order_id = b.task_order_id

where c.product_type = 0 and date(from_unixtime(a.last_recipient_time)) > date('2016-01-01') and a.payee_type <> 1 -----------已收款

UNION ALL

select '企業管家消耗' label,date(c.update_ymd) close_ymd,b.product_name,b.product_id,

a.task_id,(case when a.yb_price = 0 and b.product2_type = 1 then b.selling_price_min else a.yb_price end) amt,

(case when a.yb_price = 0 and b.product2_type = 2 then 0 when b.product2_type = 1 and e.shop_id is not null then e.price else c.order_hosted_price end) cost

from mysql8.dataview_tprc.tprc_task a

left join mysql7.dataview_trade.ddc_product_info b on a.product_id = b.product_id and b.snapshot_version = 'SELLING'

inner join hive.bdc_dwd.dw_mk_order c on a.order_id = c.order_id and c.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

left join hive.bdc_dwd.dw_mk_order_status d on d.order_id = c.order_id and d.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

left join mysql4.dataview_scrm.sc_tprc_product_info e on e.product_id = b.product_id and e.shop_id = c.seller_id

where d.valid_state in (100,200) and d.complete_state = 1 and c.order_mode = 10

union ALL

select '交易管理系統' label,date(t6.close_ymd) close_ymd,t4.product_name,t4.product_id,

t1.order_id,(t1.order_hosted_price-t1.order_refund_price) amt,

(case when t1.order_mode <> 11 then t7.user_amount when t1.order_mode = 11 and t4.product2_type = 1 and t5.shop_id is not null then t5.price else t8.cost end) cost

from hive.bdc_dwd.dw_mk_order t1

left join hive.bdc_dwd.dw_mk_order_business t2 on t1.order_id = t2.order_id and t2.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

left join mysql7.dataview_trade.mk_order_merchant t3 on t1.order_id = t3.order_id

left join mysql7.dataview_trade.ddc_product_info t4 on t4.product_id = t3.MERCHANT_ID and t4.snapshot_version = 'SELLING'

left join mysql4.dataview_scrm.sc_tprc_product_info t5 on t5.product_id = t4.product_id and t5.shop_id = t1.seller_id

left join hive.bdc_dwd.dw_fact_task_ss_daily t6 on t6.task_id = t2.task_id and t6.acct_time=date_format(date_add('day',-1,current_date),'%Y-%m-%d')

left join (select a.task_id,sum(a.user_amount) user_amount

from hive.bdc_dwd.dw_fn_deal_asyn_order a

where a.is_new=1 and a.service='Trade_Payment' and a.state=1 and a.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

group by a.task_id)t7 on t7.task_id = t2.task_id

left join (select t1.par_order_id,sum(t1.order_hosted_price - t1.order_refund_price) cost

from hive.bdc_dwd.dw_mk_order t1

where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2) and t1.order_type = 1 and t1.order_stype = 4 and t1.order_mode = 12

group by t1.par_order_id) t8 on t1.order_id = t8.par_order_id

where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)

and t1.order_type = 1 and t1.order_stype in (4,5) and t1.order_mode <> 12 and t4.product_id is not null and t1.order_hosted_price > 0 and t6.is_deal = 1 and t6.close_ymd >= '2018-12-31'

)a

left join mysql7.dataview_trade.ddc_product_info b on a.product_id = b.product_id and b.snapshot_version = 'SELLING'

where b.product2_type = 1 -------标品

and close_ymd between DATE_ADD('day',-7,CURRENT_DATE) and DATE_ADD('day',-1,CURRENT_DATE)

GROUP BY b.product_name,

b.selling_price_max,

b.gross_profit_rate_max/100,

b.actrul_supply_num,

case when b.business_type =1 then '自營消化' when b.business_type =2 then '服務商消化' end

order by count(a.order_id) desc

limit 10

if __name__ == '__main__':

table_names=[]

sql=get_sqlstr('read_sql.txt')

stmt_tuple=analysis_statements(sql)

for each_stmt in stmt_tuple:

type_name=get_main_functionsql(each_stmt)

blood_table(each_stmt)

Tree_visus(table_names,type_name)

資料治理:基于Python-sqlparse的SQL表血緣追蹤解析實作

點關注,防走丢,如有纰漏之處,請留言指教,非常感謝

以上就是本期的全部内容。我是fanstuck ,有問題大家随時留言讨論 ,我們下期見