什麼是 CDC ?
CDC,Change Data Capture,變更資料擷取的簡稱,使用 CDC 我們可以從資料庫中擷取已送出的更改并将這些更改發送到下遊,供下遊使用。這些變更可以包括 INSERT,DELETE,UPDATE 等.
要解決什麼問題 ?
- 使用 flink sql 進行資料同步,可以将資料從一個資料同步到其他的地方,比如 mysql、elasticsearch 等。
- 可以在源資料庫上實時的物化一個聚合視圖
- 因為隻是增量同步,是以可以實時的低延遲的同步資料
- 使用 EventTime join 一個 temporal 表以便可以擷取準确的結果
開啟 Mysql Binlog
mysql 的 binlog 預設是關閉的,我們需要先把它開啟,配置非常簡單.
# 開啟binlog日志
log-bin=mysql-bin
binlog_format=ROW
server_id=142
隻需要配置這幾個參數就可以了,還有很多可選的配置,自己也可以根據需要添加.
添加 pom 依賴
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
定義 DDL
CREATE TABLE mysql_cdc (
name STRING,
age INT,
city STRING,
phone STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'master',
'port' = '3306',
'username' = 'mysql',
'password' = '12345678',
'database-name' = 'test',
'table-name' = 'ab',
'debezium.snapshot.mode' = 'initial'
)
CREATE TABLE kafka_mysql_cdc (
name STRING,
age INT,
city STRING,
phone STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test1',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
'format' = 'debezium-json'
)
insert into kafka_mysql_cdc
select * from mysql_cdc
debezium-json 格式化
定義了從 mysql 讀取資料并寫入到 kafka 中,格式化方式是 debezium-json 然後啟動任務看一下資料
{"before":null,"after":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"op":"c"}
{"before":null,"after":{"name":"spark","age":25,"city":"beijing","phone":"17610775718"},"op":"c"}
{"before":null,"after":{"name":"Flink","age":100,"city":"beijing","phone":"111111"},"op":"c"}
我這裡用的是 debezium-json 來格式化資料,第一次會全量讀取表裡的資料,可以看到隻有 3 條資料, before 表示的是修改之前的資料,after 表示的是修改之後的資料,op 表示的是操作的類型.然後我先向 mysql 添加一條新的資料.
INSERT INTO ab(name,age,city,phone) VALUES ('hadoop',00,'shanghai',778899);
消費到的資料:
{"before":null,"after":{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"},"op":"c"}
然後再來修改一條資料:
UPDATE ab set age = '00' WHERE name = 'JasonLee';
{"before":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"after":null,"op":"d"}
{"before":null,"after":{"name":"JasonLee","age":0,"city":"beijing","phone":"16345646"},"op":"c"}
可以看到消費到了兩條資料,因為在 Flink 裡面 update 操作會被翻譯成 delete 和 insert 操作,第一條資料的 before 是修改之前的資料,op 的類型是 d(delete),第二條資料的 before 置為了 null, after 表示的是修改之後的資料,之前的 age 是 100,修改之後是 0 ,op 的類型是 c(create).
canal-json 格式化
隻需要把上面 DDL 中的 format 改為 canal-json 即可.然後重新開機一下任務,消費到的資料如下:
{"data":[{"name":"JasonLee","age":2,"city":"beijing","phone":"16345646"}],"type":"INSERT"}
{"data":[{"name":"spark","age":25,"city":"beijing","phone":"17610775718"}],"type":"INSERT"}
{"data":[{"name":"Flink","age":100,"city":"beijing","phone":"111111"}],"type":"INSERT"}
{"data":[{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hive","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hbase","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"kafka","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
我們的資料是放在 data 裡面,然後 type 代表了操作的類型.第一次加載的時候全部都是 INSERT 類型的資料,然後我再向 mysql 插入一條新資料
INSERT INTO ab(name,age,city,phone) VALUES ('clickhouse',00,'shanghai',778899);
{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
UPDATE ab set age = '20' WHERE name = 'clickhouse';
{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"DELETE"}
{"data":[{"name":"clickhouse","age":20,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
CDC 優點
- 開箱即用,簡單易上手
- 減少維護的元件,簡化實時鍊路,減輕部署成本
- 減小端到端延遲
- Flink 自身支援 Exactly Once 的讀取和計算
- 資料不落地,減少存儲成本
- 支援全量和增量流式讀取
- binlog 采集位點可回溯