天天看點

FlinkSQL 資料權限之資料脫敏解決方案

作者:散文随風想

在當今數字化時代,資料已經成為企業群組織中不可或缺的重要資産,包括個人資訊、商業機密、财務資料等等。然而,随着資料洩露和安全問題的不斷增加,資料脫敏已經成為了一項非常重要的工作。随着以 Flink 為代表的實時數倉的興起,企業對實時資料安全的需求越來越迫切。但由于 Flink 實時數倉領域發展相對較短,Apache Ranger 尚不支援 FlinkSQL,且依賴 Ranger 會導緻系統的部署和運維愈加複雜。

是以,自研出 FlinkSQL 的資料脫敏方案,支援面向使用者級别的資料脫敏通路控制,即特定使用者隻能通路到脫敏後的資料。在技術實作上做到對 Flink 和 Calcite 源碼的零侵入,可以快速內建到已有實時平台産品中。

一、基礎知識

1.1 資料脫敏

資料脫敏 (Data Masking) 是一種資料安全技術,用于保護敏感資料,以防止未經授權的通路。該技術通過将敏感資料替換為虛假資料或不可識别的資料來實作。例如可以使用資料脫敏技術将信用卡号碼、社會安全号碼等敏感資訊替換為随機生成的數字或字母,以保護這些資訊的隐私和安全。

1.2 業務流程

下面用訂單表orders的兩行資料來舉例,示例資料如下:

FlinkSQL 資料權限之資料脫敏解決方案

1.2.1 設定脫敏政策

管理者配置使用者、表、字段、脫敏條件,例如下面的配置。

FlinkSQL 資料權限之資料脫敏解決方案

1.2.2 使用者通路資料

當使用者在 Flink 上查詢orders表的資料時,會在底層結合該使用者的脫敏條件重新生成 SQL,即讓資料脫敏生效。當使用者 A 和使用者 B 在執行下面相同的 SQL 時,會看到不同的結果資料。

SELECT * FROM orders           

使用者 A 檢視到的結果資料如下,customer_name字段的資料被全部掩蓋掉。

FlinkSQL 資料權限之資料脫敏解決方案

使用者 B 檢視到的結果資料如下,customer_name字段的資料隻顯示前 4 位,剩下的用 x 代替。

FlinkSQL 資料權限之資料脫敏解決方案

二、Hive 資料脫敏解決方案

在離線數倉工具 Hive 領域,由于發展多年已有 Ranger Column Masking 方案來支援字段資料的脫敏控制,詳見參考文獻 [1]。下圖是在 Ranger 裡配置 Hive 表資料脫敏條件的頁面,供參考。

FlinkSQL 資料權限之資料脫敏解決方案

但由于 Flink 實時數倉領域發展相對較短,Ranger 還不支援 FlinkSQL,以及依賴 Ranger 的話會導緻系統部署和運維過重,是以開始 自研實時數倉的資料脫敏解決工具。當然本文中的核心思想也适用于 Ranger 中,可以基于此較快開發出 ranger-flink 插件。

三、FlinkSQL 資料脫敏解決方案

3.1 解決方案

3.1.1 FlinkSQL 執行流程

可以參考作者文章 [FlinkSQL 字段血緣解決方案及源碼],本文根據 Flink1.16 修正和簡化後的執行流程如下圖所示。

FlinkSQL 資料權限之資料脫敏解決方案

在CalciteParser進行parse()和validate()處理後會得到一個 SqlNode 類型的抽象文法樹 (Abstract Syntax Tree,簡稱 AST),本文會針對此抽象文法樹來組裝行級過濾條件後生成新的 AST,以實作資料脫敏控制。

3.1.2 Calcite 對象繼承關系

下面章節要用到 Calcite 中的 SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall 和 SqlSelect 等類,此處進行簡單介紹以及展示它們間繼承關系,以便讀者閱讀本文源碼。

FlinkSQL 資料權限之資料脫敏解決方案

3.1.3 解決思路

針對輸入的 Flink SQL,在CalciteParser進行文法解析 (parse) 和文法校驗 (validate) 後生成抽象文法樹 (Abstract Syntax Tree,簡稱 AST) 後,采用自定義Calcite SqlBasicVisitor的方法周遊 AST 中的所有SqlSelect,擷取到裡面的每個輸入表。如果輸入表中字段有配置脫敏條件,則針對輸入表生成子查詢語句,并把脫敏字段改寫成CAST(脫敏函數 (字段名) AS 字段類型) AS 字段名, 再通過CalciteParser.parseExpression()把子查詢轉換成 SqlSelect,并用此 SqlSelect 替換原 AST 中的輸入表來生成新的 AST,最後得到新的 SQL 來繼續執行。

FlinkSQL 資料權限之資料脫敏解決方案

3.2 詳細方案

3.2.1 解析輸入表

通過對 Flink SQL 文法的分析和研究,最終出現輸入表的隻包含以下兩種情況:

  1. SELECT 語句的 FROM 子句,如果是子查詢,則遞歸繼續周遊。
  2. SELECT ... JOIN 語句的 Left 和 Right 子句,如果是多表 JOIN,則遞歸查詢周遊。

是以,下面的主要步驟會根據 FROM 子句的類型來尋找輸入表。

3.2.2 主要步驟

主要通過 Calcite 提供的通路者模式自定義 DataMaskVisitor 來實作,周遊 AST 中所有的 SqlSelect 對象用子查詢替換裡面的輸入表。下面較長的描述替換輸入表的步驟,整體流程如下圖所示。

FlinkSQL 資料權限之資料脫敏解決方案
  1. 周遊 AST 中的 SELECT 語句。
  2. 判斷是否自定義的 SELECT 語句 (由下面步驟 9 生成),是則跳轉到步驟 10,否則繼續步驟 3。
  3. 判斷 SELECT 語句中的 FROM 類型,按照不同類型對應執行下面的步驟 4、5 和 10。
  4. 如果 FROM 是 SqlJoin 類型,則分别周遊其左 Left 和 Right 右節點,即執行目前步驟 4 和步驟 6。由于可能是三張表及以上的 Join,是以進行遞歸處理,即針對其左節點跳回到步驟 3。
  5. 如果 FROM 是 SqlBasicCall 類型,還需要判斷是否來自子查詢,是則跳轉到步驟 10 繼續周遊 AST,後續步驟 1 會對子查詢中的 SELECT 語句進行處理。否則跳轉到步驟 7。
  6. 遞歸處理 Join 的右節點,即跳回到步驟 3。
  7. 周遊表中的每個字段,如果某個字段有定義脫敏條件,則把改字段改寫成格式CAST(脫敏函數 (字段名) AS 字段類型) AS 字段名,否則用原字段名。
  8. 針對步驟 7 處理後的字段,建構子查詢語句,形如 (SELECT 字段名 1, 字段名 2, CAST(脫敏函數 (字段名 3) AS 字段類型) AS 字段名 3、字段名 4 FROM 表名) AS 表别名。
  9. 對步驟 8 的子查詢調用CalciteParser.parseExpression()進行解析,生成自定義的 SELECT 語句,并替換掉原 FROM。
  10. 繼續周遊 AST,找到裡面的 SELECT 語句進行處理,跳回到步驟 1。

3.2.3 Hive 及 Ranger 相容性

在 Ranger 中,預設的脫敏政策的如下所示。通過調研發現 Ranger 的大部分脫敏政策是通過調用 Hive 自帶或自定義的系統函數實作的。

由于 Flink 支援 Hive Catalog,在 Flink 能調用 Hive 系統函數。是以,本方案也支援在 Flink SQL 配置 Ranger 的脫敏政策。

四、用例測試

源碼位址:https://github.com/HamaWhiteGG/flink-sql-security

注: 如果用 IntelliJ IDEA 打開源碼,請提前安裝 Manifold 插件。

用例測試資料來自于 CDC Connectors for Apache Flink 官網,本文給orders表增加一個 region 字段,再增加'connector'='print'類型的 print_sink 表,其字段和orders表的一樣,資料庫建表及初始化 SQL 位于 data/database 目錄下。

下載下傳本文源碼後,可通過 Maven 運作單元測試,測試用例中的 catalog 名稱是hive,database 名稱是default。

$ cd flink-sql-security
$ mvn test           

詳細測試用例可檢視源碼中的單測RewriteDataMaskTest和ExecuteDataMaskTest,下面隻描述兩個案例。

4.1 測試 SELECT

4.1.1 輸入 SQL

使用者 A 執行下述 SQL:

SELECT order_id, customer_name, product_id, region FROM orders           

4.1.2 根據脫敏條件重新生成 SQL

  1. 輸入 SQL 是一個簡單 SELECT 語句,經過文法分析和文法校驗後 FROM 類型是SqlBasicCall,SQL 中的表名orders會被替換為完整的hive.default.orders,别名是orders。
  2. 由于使用者 A 針對字段customer_name定義脫敏條件 MASK(對應函數是脫敏函數是mask),該字段在流程圖中的步驟 8 中被改寫為CAST(mask(customer_name) AS STRING) AS customer_name,其餘字段未定義脫敏條件則保持不變。
  3. 然後在步驟 8 的操作中,表名hive.default.orders被改寫成如下子查詢,子查詢兩側用括号()進行包裹,并且用 AS 别名來增加表别名。
(SELECT
     order_id,
     order_date,
     CAST(mask(customer_name) AS STRING) AS customer_name,
     product_id,
     price,
     order_status,
     region
FROM 
    hive.default.orders 
) AS orders           

4.1.3 輸出 SQL 和運作結果

最終執行的改寫後 SQL 如下所示,這樣使用者 A 查詢到的顧客姓名customer_name字段都是掩蓋後的資料。

SELECT
    orders.order_id,
    orders.customer_name,
    orders.product_id,
    orders.region
FROM (
    SELECT 
         order_id,
         order_date,
         CAST(mask(customer_name) AS STRING) AS customer_name,
         product_id,
         price,
         order_status,
         region
    FROM 
         hive.default.orders 
     ) AS orders           

4.2 測試 INSERT-SELECT

4.2.1 輸入 SQL

使用者 A 執行下述 SQL:

INSERT INTO print_sink SELECT * FROM orders           

4.2.2 根據脫敏條件重新生成 SQL

通過自定義 Calcite DataMaskVisitor 通路生成的 AST,能找到對應的 SELECT 語句SELECT * FROM orders,注意在文法校驗階段 * 會被改寫成表中所有字段。針對此 SELECT 語句的改寫邏輯同上,不再闡述。

4.2.3 輸出 SQL 和運作結果

最終執行的改寫後 SQL 如下所示,注意插入到print_sink表的customer_name字段是掩蓋後的資料。

INSERT INTO print_sink (
    SELECT 
        orders.order_id,
        orders.order_date,
        orders.customer_name,
        orders.product_id,
        orders.price,
        orders.order_status,
        orders.region
    FROM (
        SELECT 
            order_id, 
            order_date, 
            CAST(mask(customer_name) AS STRING) AS customer_name, 
            product_id, 
            price, 
            order_status, 
            region 
        FROM 
            hive.default.orders
    ) AS orders
)           

五、參考文獻

  1. Apache Ranger Column Masking in Hive(https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/dynamic_resource_based_column_masking_in_hive_with_ranger_policies.html)
  2. FlinkSQL 字段血緣解決方案及源碼 (https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md)
  3. 從 SQL 語句中解析出源表和結果表 (https://blog.jrwang.me/2018/parse-table-in-sql)
  4. 基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL(https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html)
  5. HiveQL—資料脫敏函數 (https://blog.csdn.net/CPP_MAYIBO/article/details/104065839)

繼續閱讀