天天看點

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

本文基于阿裡巴巴進階開發工程師徐榜江 (雪盡) 7 月 10 日在北京站 Flink Meetup 分享的《詳解 Flink-CDC》整理。

《詳解 Flink-CDC》深入講解了最新釋出的 Flink CDC 2.0.0 版本帶來的核心特性,包括:全量資料的并發讀取、checkpoint、無鎖讀取等重大改進。

項目位址

文檔位址

詳解Flink-CDC PPT

CDC

什麼是CDC

CDC 的全稱是 Change Data Capture ,在廣義的概念上,隻要是能捕獲資料變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向資料庫的變更,是一種用于捕獲資料庫中資料變更的技術

開源 CDC 方案比較

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

flink CDC Connectors

什麼是Flink CDC Connectors?

Flink CDC Connectors是一組用于Apache Flink的源連接配接器,使用更改資料捕獲(CDC)從不同的資料庫接收更改。Flink CDC Connectors內建Debezium作為捕獲資料更改的引擎。是以它可以充分利用Debezium的能力。

核心 feature

  • 支援讀取資料庫快照,并繼續采用精确一次處理的方式讀取binlogs,即使發生故障。
  • DataStream API的CDC連接配接器,使用者可以在不部署Debezium和Kafka的情況下,在一個作業中消費多個資料庫和表上的更改。
  • 對于Table/SQL API的CDC連接配接器,使用者可以使用SQL DDL建立CDC源來監視單個表上的更改。
  • 全程無鎖,不對線上業務産生鎖的風險;
  • 多并行度,可水準擴充,全量資料的讀取階段支援水準擴充,使億級别的大表可以通過加大并行度來加快讀取速度;
  • 斷點續傳,支援全量階段的 checkpoint,即使任務因某種原因退出了,也可通過儲存的 checkpoint 對任務進行恢複實作資料的斷點續傳。

後面3點是Flink CDC 2.0中才解決的。

binlog就是一張Dynamic Table

Flink 有兩個基礎概念:Dynamic Table 和 Changelog Stream。

  • Dynamic Table 就是 Flink SQL 定義的動态表,動态表和流的概念是對等的。參照下圖,流可以轉換成動态表,動态表也可以轉換成流。
  • 在 Flink SQL中,資料在從一個算子流向另外一個算子時都是以 Changelog Stream 的形式,任意時刻的 Changelog Stream 可以翻譯為一個表,也可以翻譯為一個流。
  • 資料庫的binlog用于記錄一張表所有的變更。如果一直對表進行更新,binlog 日志流也一直會追加,資料庫中的表就相當于 binlog 日志流在某個時刻點物化的結果;日志流就是将表的變更資料持續捕獲的結果。這說明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一張不斷變化的 MySQL 資料庫表。

注意,Flink 提供了 changelog-json format,可以将 changelog 資料寫入離線數倉如 Hive / HDFS;對于實時數倉,Flink 支援将 changelog 通過 upsert-kafka connector 直接寫入 Kafka。

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

底層采集工具

flink 使用Debezium 作為 Flink CDC 的底層采集工具。

Debezium 支援全量同步,也支援增量同步,也支援全量 + 增量的同步,非常靈活,同時基于日志的 CDC 技術使得提供 Exactly-Once 成為可能。

将 Flink SQL 的内部資料結構 RowData 和 Debezium 的資料結構進行對比,可以發現兩者是非常相似的。

  • 每條 RowData 都有一個中繼資料 RowKind,包括 4 種類型, 分别是插入 (INSERT)、更新前鏡像 (UPDATE_BEFORE)、更新後鏡像 (UPDATE_AFTER)、删除 (DELETE),這四種類型和資料庫裡面的 binlog 概念保持一緻。
  • Debezium 的資料結構,也有一個類似的中繼資料 op 字段, op 字段的取值也有四種,分别是 c、u、d、r,各自對應 create、update、delete、read。對于代表更新操作的 u,其資料部分同時包含了前鏡像 (before) 和後鏡像 (after)。

通過分析兩種資料結構,Flink 和 Debezium 兩者的底層資料是可以非常友善地對接起來的,大家可以發現 Flink 做 CDC 從技術上是非常合适的。

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

基于flink cdc的聚合分析

  • 是一個純 SQL 作業,這意味着隻要會 SQL 的 BI,業務線同學都可以完成此類工作。同時,使用者也可以利用 Flink SQL 提供的豐富文法進行資料清洗、分析、聚合。
  • 利用 Flink SQL 雙流 JOIN、維表 JOIN、UDTF 文法可以非常容易地完成資料打寬,以及各種業務邏輯加工。
    Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

Flink CDC 2.0 詳解

需要解決的痛點問題

  • 一緻性通過加鎖保證:全量 + 增量讀取的過程需要保證所有資料的一緻性,是以需要通過加鎖保證,但是加鎖在資料庫層面上是一個十分高危的操作。底層 Debezium 在保證資料一緻性時,需要對讀取的庫或表加鎖,全局鎖可能導緻資料庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖權限。
  • 不支援水準擴充:因為 Flink CDC 底層是基于 Debezium,起架構是單節點,是以Flink CDC 隻支援單并發。在全量階段讀取階段,如果表非常大 (億級别),讀取時間在小時甚至天級别,使用者不能通過增加資源去提升作業速度。
  • 全量讀取階段不支援 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支援 checkpoint 的,是以會存在一個問題:當我們同步全量資料時,假設需要 5 個小時,當我們同步了 4 小時的時候作業失敗,這時候就需要重新開始,再讀取 5 個小時。

如何解決

Netflix 的 DBLog 論文中 Chunk 讀取算法 + 基于 FLIP-27 來優雅地實作的

整體流程可以概括為,首先通過主鍵對表進行 Snapshot Chunk 劃分,再将 Snapshot Chunk 分發給多個 SourceReader,每個 Snapshot Chunk 讀取時通過算法實作無鎖條件下的一緻性讀,SourceReader 讀取時支援 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 讀取完成後,下發一個 binlog chunk 進行增量部分的 binlog 讀取,這便是 Flink CDC 2.0 的整體流程,如下圖所示:

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

使用示例

以mysql-cdc為例,講解如何使用

模拟電商公司的訂單表和物流表,通過flink join将訂單表打寬 【線上運作時,需要使用時态表join進行優化】

需求:需要對訂單資料進行統計分析,對于不同的資訊需要進行關聯後續形成訂單的大寬表後,交給下遊的業務方使用 ES 做資料分析,

這個案例示範了如何隻依賴 Flink 不依賴其他元件,借助 Flink 強大的計算能力實時把 Binlog 的資料流關聯一次并同步至 ES

環境是flink 1.13.2, flink-sql-connector-mysql-cdc-2.0.2.jar (放在<FLINK_HOME>/lib/下面)

注意:

  • flink-sql-connector-mysql-cdc-2.0.2.jar不能通過sql-client.sh 的-j或 -l參數指定,否則報:io.debezium.relational.RelationalDatabaseConnectorConfig NoSuchFieldError: PASSWORD 異常,導緻整個flink叢集退出
  • 在mysql-cdc 2.x中預設開啟了scan.incremental.snapshot.enabled, 如果表沒有主鍵,則會導緻增量快照讀( incremental snapshot reading)失敗,則需要将scan.incremental.snapshot.enabled設定為false
  • 快照資料塊分割采用的算法是:chunk reading algorithm ,塊切分采用固定的步長,由參數scan.incremental.snapshot.chunk.size确定,預設值是:8096
    • 針對自增的數字,則按主鍵從小到大進行切換
    • 針對其它主鍵,則按SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > ‘uuid-001’ limit 25) 擷取切換的範圍
    • 每個chunk reader執行Offset Signal Algorithm以獲得快照塊的最終一緻輸出
  • 如果需要并行運作,每個并行Reader應該有一個唯一的伺服器id,是以’ server-id ‘必須是’ 5400-6400 '這樣的範圍,并且範圍必須大于并行度 (并行度通過 SET ‘parallelism.default’ = 8; 設定)。

mysql開啟binlog

修改配置

在my.cnf中的mysqld中增加如下配置

[mysqld]
        # 前面還有其他配置
        # 添加的部分
        server-id = 12345
        log-bin = mysql-bin
        # 必須為ROW
        binlog_format = ROW
        # 必須為FULL,MySQL-5.7後才有該參數
        binlog_row_image  = FULL
        expire_logs_days  = 15
           

驗證

SHOW VARIABLES LIKE ‘%binlog%’;

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

設定權限

-- 設定擁有同步權限的使用者
        CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
        -- 針對MySQL 8.0
        ALTER USER 'flinkuser' IDENTIFIED WITH mysql_native_password BY 'flinkpassword';
        -- 賦予同步相關權限
        GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
        
        FLUSH PRIVILEGES;
        
        建立使用者并賦予權限成功後,使用該使用者登入MySQL,可以使用以下指令檢視主從同步相關資訊
        SHOW MASTER STATUS
        SHOW SLAVE STATUS
        SHOW BINARY LOGS
           

MySQL中建立表

drop table if exists products;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

drop table if exists orders;
CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- 是否下單
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

drop table if exists shipments;
CREATE TABLE shipments (
  shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_id INTEGER NOT NULL,
  origin VARCHAR(255) NOT NULL,
  destination VARCHAR(255) NOT NULL,
  is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 1001;;

INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
       (default,10002,'Hangzhou','Shanghai',false),
       (default,10003,'Shanghai','Hangzhou',false);
           

flink sql client中配置

set table.exec.source.cdc-events-duplicate = true;
CREATE TABLE products (
  id INT,
  name STRING,
  description STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '172.25.21.29',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpassword',
  'database-name' = 'db_inventory_cdc',
  'table-name' = 'products',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '25',
  'server-id'='5401-5405'
);

CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN,
  PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '172.25.21.29',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpassword',
  'database-name' = 'db_inventory_cdc',
  'table-name' = 'orders',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '25',
  'server-id'='5406-5410'
);

CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN,
  PRIMARY KEY(shipment_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '172.25.21.29',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpassword',
  'database-name' = 'db_inventory_cdc',
  'table-name' = 'shipments',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '25',
  'server-id'='5411-5415'
);

CREATE TABLE enriched_orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN,
  product_name STRING,
  product_description STRING,
  shipment_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://172.25.11.77:9401',
    'index' = 'enriched_orders'
);

INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
           

在執行完insert into這條語句後,則sql-client會送出一個flink job到本地的flink cluster, 詳見下面的截圖:

Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例
Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

資料驗證

  • 通過ES的kinba查詢資料,發現enriched_orders索引中已經有了三條資料
  • 通過增加、修改、删除訂單和物流記錄,則ES中的enriched_orders的索引資料跟着做實時變化。

    注意:必須要設定checkpoint,否則程式會卡到全量階段的最後一步,進不到增量讀取階段

INSERT INTO orders VALUES (default, '2021-09-27 15:22:00', 'Jark', 29.71, 104, false);
INSERT INTO shipments VALUES (default,10004,'Shanghai','Beijing',false);
--更新記錄
UPDATE orders SET order_status = true WHERE order_id = 10004;
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1001;
--删除記錄
DELETE FROM orders WHERE order_id = 10004;
           
Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例

繼續閱讀