在當今數字化時代,資料已經成為企業群組織中不可或缺的重要資産,包括個人資訊、商業機密、财務資料等等。然而,随着資料洩露和安全問題的不斷增加,資料脫敏已經成為了一項非常重要的工作。随着以 Flink 為代表的實時數倉的興起,企業對實時資料安全的需求越來越迫切。但由于 Flink 實時數倉領域發展相對較短,Apache Ranger 尚不支援 FlinkSQL,且依賴 Ranger 會導緻系統的部署和運維愈加複雜。
是以,自研出 FlinkSQL 的資料脫敏方案,支援面向使用者級别的資料脫敏通路控制,即特定使用者隻能通路到脫敏後的資料。在技術實作上做到對 Flink 和 Calcite 源碼的零侵入,可以快速內建到已有實時平台産品中。
一、基礎知識
1.1 資料脫敏
資料脫敏 (Data Masking) 是一種資料安全技術,用于保護敏感資料,以防止未經授權的通路。該技術通過将敏感資料替換為虛假資料或不可識别的資料來實作。例如可以使用資料脫敏技術将信用卡号碼、社會安全号碼等敏感資訊替換為随機生成的數字或字母,以保護這些資訊的隐私和安全。
1.2 業務流程
下面用訂單表orders的兩行資料來舉例,示例資料如下:
1.2.1 設定脫敏政策
管理者配置使用者、表、字段、脫敏條件,例如下面的配置。
1.2.2 使用者通路資料
當使用者在 Flink 上查詢orders表的資料時,會在底層結合該使用者的脫敏條件重新生成 SQL,即讓資料脫敏生效。當使用者 A 和使用者 B 在執行下面相同的 SQL 時,會看到不同的結果資料。
SELECT * FROM orders
使用者 A 檢視到的結果資料如下,customer_name字段的資料被全部掩蓋掉。
使用者 B 檢視到的結果資料如下,customer_name字段的資料隻顯示前 4 位,剩下的用 x 代替。
二、Hive 資料脫敏解決方案
在離線數倉工具 Hive 領域,由于發展多年已有 Ranger Column Masking 方案來支援字段資料的脫敏控制,詳見參考文獻 [1]。下圖是在 Ranger 裡配置 Hive 表資料脫敏條件的頁面,供參考。
但由于 Flink 實時數倉領域發展相對較短,Ranger 還不支援 FlinkSQL,以及依賴 Ranger 的話會導緻系統部署和運維過重,是以開始 自研實時數倉的資料脫敏解決工具。當然本文中的核心思想也适用于 Ranger 中,可以基于此較快開發出 ranger-flink 插件。
三、FlinkSQL 資料脫敏解決方案
3.1 解決方案
3.1.1 FlinkSQL 執行流程
可以參考作者文章 [FlinkSQL 字段血緣解決方案及源碼],本文根據 Flink1.16 修正和簡化後的執行流程如下圖所示。
在CalciteParser進行parse()和validate()處理後會得到一個 SqlNode 類型的抽象文法樹 (Abstract Syntax Tree,簡稱 AST),本文會針對此抽象文法樹來組裝行級過濾條件後生成新的 AST,以實作資料脫敏控制。
3.1.2 Calcite 對象繼承關系
下面章節要用到 Calcite 中的 SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall 和 SqlSelect 等類,此處進行簡單介紹以及展示它們間繼承關系,以便讀者閱讀本文源碼。
3.1.3 解決思路
針對輸入的 Flink SQL,在CalciteParser進行文法解析 (parse) 和文法校驗 (validate) 後生成抽象文法樹 (Abstract Syntax Tree,簡稱 AST) 後,采用自定義Calcite SqlBasicVisitor的方法周遊 AST 中的所有SqlSelect,擷取到裡面的每個輸入表。如果輸入表中字段有配置脫敏條件,則針對輸入表生成子查詢語句,并把脫敏字段改寫成CAST(脫敏函數 (字段名) AS 字段類型) AS 字段名, 再通過CalciteParser.parseExpression()把子查詢轉換成 SqlSelect,并用此 SqlSelect 替換原 AST 中的輸入表來生成新的 AST,最後得到新的 SQL 來繼續執行。
3.2 詳細方案
3.2.1 解析輸入表
通過對 Flink SQL 文法的分析和研究,最終出現輸入表的隻包含以下兩種情況:
- SELECT 語句的 FROM 子句,如果是子查詢,則遞歸繼續周遊。
- SELECT ... JOIN 語句的 Left 和 Right 子句,如果是多表 JOIN,則遞歸查詢周遊。
是以,下面的主要步驟會根據 FROM 子句的類型來尋找輸入表。
3.2.2 主要步驟
主要通過 Calcite 提供的通路者模式自定義 DataMaskVisitor 來實作,周遊 AST 中所有的 SqlSelect 對象用子查詢替換裡面的輸入表。下面較長的描述替換輸入表的步驟,整體流程如下圖所示。
- 周遊 AST 中的 SELECT 語句。
- 判斷是否自定義的 SELECT 語句 (由下面步驟 9 生成),是則跳轉到步驟 10,否則繼續步驟 3。
- 判斷 SELECT 語句中的 FROM 類型,按照不同類型對應執行下面的步驟 4、5 和 10。
- 如果 FROM 是 SqlJoin 類型,則分别周遊其左 Left 和 Right 右節點,即執行目前步驟 4 和步驟 6。由于可能是三張表及以上的 Join,是以進行遞歸處理,即針對其左節點跳回到步驟 3。
- 如果 FROM 是 SqlBasicCall 類型,還需要判斷是否來自子查詢,是則跳轉到步驟 10 繼續周遊 AST,後續步驟 1 會對子查詢中的 SELECT 語句進行處理。否則跳轉到步驟 7。
- 遞歸處理 Join 的右節點,即跳回到步驟 3。
- 周遊表中的每個字段,如果某個字段有定義脫敏條件,則把改字段改寫成格式CAST(脫敏函數 (字段名) AS 字段類型) AS 字段名,否則用原字段名。
- 針對步驟 7 處理後的字段,建構子查詢語句,形如 (SELECT 字段名 1, 字段名 2, CAST(脫敏函數 (字段名 3) AS 字段類型) AS 字段名 3、字段名 4 FROM 表名) AS 表别名。
- 對步驟 8 的子查詢調用CalciteParser.parseExpression()進行解析,生成自定義的 SELECT 語句,并替換掉原 FROM。
- 繼續周遊 AST,找到裡面的 SELECT 語句進行處理,跳回到步驟 1。
3.2.3 Hive 及 Ranger 相容性
在 Ranger 中,預設的脫敏政策的如下所示。通過調研發現 Ranger 的大部分脫敏政策是通過調用 Hive 自帶或自定義的系統函數實作的。
由于 Flink 支援 Hive Catalog,在 Flink 能調用 Hive 系統函數。是以,本方案也支援在 Flink SQL 配置 Ranger 的脫敏政策。
四、用例測試
源碼位址:https://github.com/HamaWhiteGG/flink-sql-security
注: 如果用 IntelliJ IDEA 打開源碼,請提前安裝 Manifold 插件。
用例測試資料來自于 CDC Connectors for Apache Flink 官網,本文給orders表增加一個 region 字段,再增加'connector'='print'類型的 print_sink 表,其字段和orders表的一樣,資料庫建表及初始化 SQL 位于 data/database 目錄下。
下載下傳本文源碼後,可通過 Maven 運作單元測試,測試用例中的 catalog 名稱是hive,database 名稱是default。
$ cd flink-sql-security
$ mvn test
詳細測試用例可檢視源碼中的單測RewriteDataMaskTest和ExecuteDataMaskTest,下面隻描述兩個案例。
4.1 測試 SELECT
4.1.1 輸入 SQL
使用者 A 執行下述 SQL:
SELECT order_id, customer_name, product_id, region FROM orders
4.1.2 根據脫敏條件重新生成 SQL
- 輸入 SQL 是一個簡單 SELECT 語句,經過文法分析和文法校驗後 FROM 類型是SqlBasicCall,SQL 中的表名orders會被替換為完整的hive.default.orders,别名是orders。
- 由于使用者 A 針對字段customer_name定義脫敏條件 MASK(對應函數是脫敏函數是mask),該字段在流程圖中的步驟 8 中被改寫為CAST(mask(customer_name) AS STRING) AS customer_name,其餘字段未定義脫敏條件則保持不變。
- 然後在步驟 8 的操作中,表名hive.default.orders被改寫成如下子查詢,子查詢兩側用括号()進行包裹,并且用 AS 别名來增加表别名。
(SELECT
order_id,
order_date,
CAST(mask(customer_name) AS STRING) AS customer_name,
product_id,
price,
order_status,
region
FROM
hive.default.orders
) AS orders
4.1.3 輸出 SQL 和運作結果
最終執行的改寫後 SQL 如下所示,這樣使用者 A 查詢到的顧客姓名customer_name字段都是掩蓋後的資料。
SELECT
orders.order_id,
orders.customer_name,
orders.product_id,
orders.region
FROM (
SELECT
order_id,
order_date,
CAST(mask(customer_name) AS STRING) AS customer_name,
product_id,
price,
order_status,
region
FROM
hive.default.orders
) AS orders
4.2 測試 INSERT-SELECT
4.2.1 輸入 SQL
使用者 A 執行下述 SQL:
INSERT INTO print_sink SELECT * FROM orders
4.2.2 根據脫敏條件重新生成 SQL
通過自定義 Calcite DataMaskVisitor 通路生成的 AST,能找到對應的 SELECT 語句SELECT * FROM orders,注意在文法校驗階段 * 會被改寫成表中所有字段。針對此 SELECT 語句的改寫邏輯同上,不再闡述。
4.2.3 輸出 SQL 和運作結果
最終執行的改寫後 SQL 如下所示,注意插入到print_sink表的customer_name字段是掩蓋後的資料。
INSERT INTO print_sink (
SELECT
orders.order_id,
orders.order_date,
orders.customer_name,
orders.product_id,
orders.price,
orders.order_status,
orders.region
FROM (
SELECT
order_id,
order_date,
CAST(mask(customer_name) AS STRING) AS customer_name,
product_id,
price,
order_status,
region
FROM
hive.default.orders
) AS orders
)
五、參考文獻
- Apache Ranger Column Masking in Hive(https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/dynamic_resource_based_column_masking_in_hive_with_ranger_policies.html)
- FlinkSQL 字段血緣解決方案及源碼 (https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md)
- 從 SQL 語句中解析出源表和結果表 (https://blog.jrwang.me/2018/parse-table-in-sql)
- 基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL(https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html)
- HiveQL—資料脫敏函數 (https://blog.csdn.net/CPP_MAYIBO/article/details/104065839)