本文基于阿裡巴巴進階開發工程師徐榜江 (雪盡) 7 月 10 日在北京站 Flink Meetup 分享的《詳解 Flink-CDC》整理。
《詳解 Flink-CDC》深入講解了最新釋出的 Flink CDC 2.0.0 版本帶來的核心特性,包括:全量資料的并發讀取、checkpoint、無鎖讀取等重大改進。
項目位址
文檔位址
詳解Flink-CDC PPT
CDC
什麼是CDC
CDC 的全稱是 Change Data Capture ,在廣義的概念上,隻要是能捕獲資料變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向資料庫的變更,是一種用于捕獲資料庫中資料變更的技術
開源 CDC 方案比較
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI2EzX4xSZz91ZsAzNfRHLGZkRGZkRfJ3bs92YsAjMfVmepNHLs9kW1cWLiVTSpJWNFZDc1IjNwVTQClGVF5UMR9Fd4VGdsATNfd3bkFGazxycykFaKdkYzZUbapXNXlleSdVY2pESa9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL2EzMiJDO5QTOjNjNiZ2NiJDNhRjNhdjMyM2M0EzYwUzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
flink CDC Connectors
什麼是Flink CDC Connectors?
Flink CDC Connectors是一組用于Apache Flink的源連接配接器,使用更改資料捕獲(CDC)從不同的資料庫接收更改。Flink CDC Connectors內建Debezium作為捕獲資料更改的引擎。是以它可以充分利用Debezium的能力。
核心 feature
- 支援讀取資料庫快照,并繼續采用精确一次處理的方式讀取binlogs,即使發生故障。
- DataStream API的CDC連接配接器,使用者可以在不部署Debezium和Kafka的情況下,在一個作業中消費多個資料庫和表上的更改。
- 對于Table/SQL API的CDC連接配接器,使用者可以使用SQL DDL建立CDC源來監視單個表上的更改。
- 全程無鎖,不對線上業務産生鎖的風險;
- 多并行度,可水準擴充,全量資料的讀取階段支援水準擴充,使億級别的大表可以通過加大并行度來加快讀取速度;
- 斷點續傳,支援全量階段的 checkpoint,即使任務因某種原因退出了,也可通過儲存的 checkpoint 對任務進行恢複實作資料的斷點續傳。
後面3點是Flink CDC 2.0中才解決的。
binlog就是一張Dynamic Table
Flink 有兩個基礎概念:Dynamic Table 和 Changelog Stream。
- Dynamic Table 就是 Flink SQL 定義的動态表,動态表和流的概念是對等的。參照下圖,流可以轉換成動态表,動态表也可以轉換成流。
- 在 Flink SQL中,資料在從一個算子流向另外一個算子時都是以 Changelog Stream 的形式,任意時刻的 Changelog Stream 可以翻譯為一個表,也可以翻譯為一個流。
- 資料庫的binlog用于記錄一張表所有的變更。如果一直對表進行更新,binlog 日志流也一直會追加,資料庫中的表就相當于 binlog 日志流在某個時刻點物化的結果;日志流就是将表的變更資料持續捕獲的結果。這說明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一張不斷變化的 MySQL 資料庫表。
注意,Flink 提供了 changelog-json format,可以将 changelog 資料寫入離線數倉如 Hive / HDFS;對于實時數倉,Flink 支援将 changelog 通過 upsert-kafka connector 直接寫入 Kafka。
底層采集工具
flink 使用Debezium 作為 Flink CDC 的底層采集工具。
Debezium 支援全量同步,也支援增量同步,也支援全量 + 增量的同步,非常靈活,同時基于日志的 CDC 技術使得提供 Exactly-Once 成為可能。
将 Flink SQL 的内部資料結構 RowData 和 Debezium 的資料結構進行對比,可以發現兩者是非常相似的。
- 每條 RowData 都有一個中繼資料 RowKind,包括 4 種類型, 分别是插入 (INSERT)、更新前鏡像 (UPDATE_BEFORE)、更新後鏡像 (UPDATE_AFTER)、删除 (DELETE),這四種類型和資料庫裡面的 binlog 概念保持一緻。
- Debezium 的資料結構,也有一個類似的中繼資料 op 字段, op 字段的取值也有四種,分别是 c、u、d、r,各自對應 create、update、delete、read。對于代表更新操作的 u,其資料部分同時包含了前鏡像 (before) 和後鏡像 (after)。
通過分析兩種資料結構,Flink 和 Debezium 兩者的底層資料是可以非常友善地對接起來的,大家可以發現 Flink 做 CDC 從技術上是非常合适的。
基于flink cdc的聚合分析
- 是一個純 SQL 作業,這意味着隻要會 SQL 的 BI,業務線同學都可以完成此類工作。同時,使用者也可以利用 Flink SQL 提供的豐富文法進行資料清洗、分析、聚合。
- 利用 Flink SQL 雙流 JOIN、維表 JOIN、UDTF 文法可以非常容易地完成資料打寬,以及各種業務邏輯加工。
Flink CDC 2.x 讓一切變得美好CDCflink CDC ConnectorsFlink CDC 2.0 詳解使用示例
Flink CDC 2.0 詳解
需要解決的痛點問題
- 一緻性通過加鎖保證:全量 + 增量讀取的過程需要保證所有資料的一緻性,是以需要通過加鎖保證,但是加鎖在資料庫層面上是一個十分高危的操作。底層 Debezium 在保證資料一緻性時,需要對讀取的庫或表加鎖,全局鎖可能導緻資料庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖權限。
- 不支援水準擴充:因為 Flink CDC 底層是基于 Debezium,起架構是單節點,是以Flink CDC 隻支援單并發。在全量階段讀取階段,如果表非常大 (億級别),讀取時間在小時甚至天級别,使用者不能通過增加資源去提升作業速度。
- 全量讀取階段不支援 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支援 checkpoint 的,是以會存在一個問題:當我們同步全量資料時,假設需要 5 個小時,當我們同步了 4 小時的時候作業失敗,這時候就需要重新開始,再讀取 5 個小時。
如何解決
Netflix 的 DBLog 論文中 Chunk 讀取算法 + 基于 FLIP-27 來優雅地實作的
整體流程可以概括為,首先通過主鍵對表進行 Snapshot Chunk 劃分,再将 Snapshot Chunk 分發給多個 SourceReader,每個 Snapshot Chunk 讀取時通過算法實作無鎖條件下的一緻性讀,SourceReader 讀取時支援 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 讀取完成後,下發一個 binlog chunk 進行增量部分的 binlog 讀取,這便是 Flink CDC 2.0 的整體流程,如下圖所示:
使用示例
以mysql-cdc為例,講解如何使用
模拟電商公司的訂單表和物流表,通過flink join将訂單表打寬 【線上運作時,需要使用時态表join進行優化】
需求:需要對訂單資料進行統計分析,對于不同的資訊需要進行關聯後續形成訂單的大寬表後,交給下遊的業務方使用 ES 做資料分析,
這個案例示範了如何隻依賴 Flink 不依賴其他元件,借助 Flink 強大的計算能力實時把 Binlog 的資料流關聯一次并同步至 ES
環境是flink 1.13.2, flink-sql-connector-mysql-cdc-2.0.2.jar (放在<FLINK_HOME>/lib/下面)
注意:
- flink-sql-connector-mysql-cdc-2.0.2.jar不能通過sql-client.sh 的-j或 -l參數指定,否則報:io.debezium.relational.RelationalDatabaseConnectorConfig NoSuchFieldError: PASSWORD 異常,導緻整個flink叢集退出
- 在mysql-cdc 2.x中預設開啟了scan.incremental.snapshot.enabled, 如果表沒有主鍵,則會導緻增量快照讀( incremental snapshot reading)失敗,則需要将scan.incremental.snapshot.enabled設定為false
- 快照資料塊分割采用的算法是:chunk reading algorithm ,塊切分采用固定的步長,由參數scan.incremental.snapshot.chunk.size确定,預設值是:8096
- 針對自增的數字,則按主鍵從小到大進行切換
- 針對其它主鍵,則按SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > ‘uuid-001’ limit 25) 擷取切換的範圍
- 每個chunk reader執行Offset Signal Algorithm以獲得快照塊的最終一緻輸出
- 如果需要并行運作,每個并行Reader應該有一個唯一的伺服器id,是以’ server-id ‘必須是’ 5400-6400 '這樣的範圍,并且範圍必須大于并行度 (并行度通過 SET ‘parallelism.default’ = 8; 設定)。
mysql開啟binlog
修改配置
在my.cnf中的mysqld中增加如下配置
[mysqld]
# 前面還有其他配置
# 添加的部分
server-id = 12345
log-bin = mysql-bin
# 必須為ROW
binlog_format = ROW
# 必須為FULL,MySQL-5.7後才有該參數
binlog_row_image = FULL
expire_logs_days = 15
驗證
SHOW VARIABLES LIKE ‘%binlog%’;
設定權限
-- 設定擁有同步權限的使用者
CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
-- 針對MySQL 8.0
ALTER USER 'flinkuser' IDENTIFIED WITH mysql_native_password BY 'flinkpassword';
-- 賦予同步相關權限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
FLUSH PRIVILEGES;
建立使用者并賦予權限成功後,使用該使用者登入MySQL,可以使用以下指令檢視主從同步相關資訊
SHOW MASTER STATUS
SHOW SLAVE STATUS
SHOW BINARY LOGS
MySQL中建立表
drop table if exists products;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
drop table if exists orders;
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- 是否下單
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
drop table if exists shipments;
CREATE TABLE shipments (
shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_id INTEGER NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 1001;;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
flink sql client中配置
set table.exec.source.cdc-events-duplicate = true;
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.25.21.29',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpassword',
'database-name' = 'db_inventory_cdc',
'table-name' = 'products',
'connect.timeout' = '60s',
'scan.incremental.snapshot.chunk.size' = '25',
'server-id'='5401-5405'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.25.21.29',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpassword',
'database-name' = 'db_inventory_cdc',
'table-name' = 'orders',
'connect.timeout' = '60s',
'scan.incremental.snapshot.chunk.size' = '25',
'server-id'='5406-5410'
);
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY(shipment_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.25.21.29',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpassword',
'database-name' = 'db_inventory_cdc',
'table-name' = 'shipments',
'connect.timeout' = '60s',
'scan.incremental.snapshot.chunk.size' = '25',
'server-id'='5411-5415'
);
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://172.25.11.77:9401',
'index' = 'enriched_orders'
);
INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
在執行完insert into這條語句後,則sql-client會送出一個flink job到本地的flink cluster, 詳見下面的截圖:
資料驗證
- 通過ES的kinba查詢資料,發現enriched_orders索引中已經有了三條資料
-
通過增加、修改、删除訂單和物流記錄,則ES中的enriched_orders的索引資料跟着做實時變化。
注意:必須要設定checkpoint,否則程式會卡到全量階段的最後一步,進不到增量讀取階段
INSERT INTO orders VALUES (default, '2021-09-27 15:22:00', 'Jark', 29.71, 104, false);
INSERT INTO shipments VALUES (default,10004,'Shanghai','Beijing',false);
--更新記錄
UPDATE orders SET order_status = true WHERE order_id = 10004;
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1001;
--删除記錄
DELETE FROM orders WHERE order_id = 10004;