天天看點

【FlinkSQL實戰系列】Flink SQL CDC 實時同步 Mysql 的 Binlog 資料到 kafka

什麼是 CDC ?

CDC,Change Data Capture,變更資料擷取的簡稱,使用 CDC 我們可以從資料庫中擷取已送出的更改并将這些更改發送到下遊,供下遊使用。這些變更可以包括 INSERT,DELETE,UPDATE 等.

要解決什麼問題 ?

  • 使用 flink sql 進行資料同步,可以将資料從一個資料同步到其他的地方,比如 mysql、elasticsearch 等。
  • 可以在源資料庫上實時的物化一個聚合視圖
  • 因為隻是增量同步,是以可以實時的低延遲的同步資料
  • 使用 EventTime join 一個 temporal 表以便可以擷取準确的結果

開啟 Mysql Binlog

mysql 的 binlog 預設是關閉的,我們需要先把它開啟,配置非常簡單.

# 開啟binlog日志
log-bin=mysql-bin
binlog_format=ROW
server_id=142
      

隻需要配置這幾個參數就可以了,還有很多可選的配置,自己也可以根據需要添加.

添加 pom 依賴

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>
      

定義 DDL

CREATE TABLE mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'master',
  'port' = '3306',
  'username' = 'mysql',
  'password' = '12345678',
  'database-name' = 'test',
  'table-name' = 'ab',
  'debezium.snapshot.mode' = 'initial'
)

CREATE TABLE kafka_mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test1',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
 'format' = 'debezium-json'
)

insert into kafka_mysql_cdc
select * from mysql_cdc
      

debezium-json 格式化

定義了從 mysql 讀取資料并寫入到 kafka 中,格式化方式是 debezium-json 然後啟動任務看一下資料

{"before":null,"after":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"op":"c"}
{"before":null,"after":{"name":"spark","age":25,"city":"beijing","phone":"17610775718"},"op":"c"}
{"before":null,"after":{"name":"Flink","age":100,"city":"beijing","phone":"111111"},"op":"c"}
      

我這裡用的是 debezium-json 來格式化資料,第一次會全量讀取表裡的資料,可以看到隻有 3 條資料, before 表示的是修改之前的資料,after 表示的是修改之後的資料,op 表示的是操作的類型.然後我先向 mysql 添加一條新的資料.

INSERT INTO ab(name,age,city,phone) VALUES ('hadoop',00,'shanghai',778899);
      

消費到的資料:

{"before":null,"after":{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"},"op":"c"}
      

然後再來修改一條資料:

UPDATE ab set age = '00' WHERE name = 'JasonLee';
      
{"before":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"after":null,"op":"d"}
{"before":null,"after":{"name":"JasonLee","age":0,"city":"beijing","phone":"16345646"},"op":"c"}
      

可以看到消費到了兩條資料,因為在 Flink 裡面 update 操作會被翻譯成 delete 和 insert 操作,第一條資料的 before 是修改之前的資料,op 的類型是 d(delete),第二條資料的 before 置為了 null, after 表示的是修改之後的資料,之前的 age 是 100,修改之後是 0 ,op 的類型是 c(create).

canal-json 格式化

隻需要把上面 DDL 中的 format 改為 canal-json 即可.然後重新開機一下任務,消費到的資料如下:

{"data":[{"name":"JasonLee","age":2,"city":"beijing","phone":"16345646"}],"type":"INSERT"}
{"data":[{"name":"spark","age":25,"city":"beijing","phone":"17610775718"}],"type":"INSERT"}
{"data":[{"name":"Flink","age":100,"city":"beijing","phone":"111111"}],"type":"INSERT"}
{"data":[{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hive","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hbase","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"kafka","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
      

我們的資料是放在 data 裡面,然後 type 代表了操作的類型.第一次加載的時候全部都是 INSERT 類型的資料,然後我再向 mysql 插入一條新資料

INSERT INTO ab(name,age,city,phone) VALUES ('clickhouse',00,'shanghai',778899);
      
{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
      
UPDATE ab set age = '20' WHERE name = 'clickhouse';
      
{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"DELETE"}
{"data":[{"name":"clickhouse","age":20,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
      

CDC 優點

  • 開箱即用,簡單易上手
  • 減少維護的元件,簡化實時鍊路,減輕部署成本
  • 減小端到端延遲
  • Flink 自身支援 Exactly Once 的讀取和計算
  • 資料不落地,減少存儲成本
  • 支援全量和增量流式讀取
  • binlog 采集位點可回溯