天天看點

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

一. 背景介紹

本文将介紹如何将 MySQL 中的資料,通過 Binlog + Canal 的形式導入到 Kafka 中,繼而被 Flink 消費的案例。

為了能夠快速的驗證整套流程的功能性,所有的元件都以單機的形式部署。如果手上的實體資源不足,可以将本文中的所有元件一台 4G 1U 的虛拟機環境中。

如果需要在生産環境中部署,建議将每一個元件替換成高可用的叢集部署方案。

其中,我們單獨建立了一套 Zookeeper 單節點環境,Flink、Kafka、Canal 等元件共用這個 Zookeeper 環境。

針對于所有需要 JRE 的元件,如 Flink,Kafka,Canal,Zookeeper,考慮到更新 JRE 可能會影響到其他的應用,我們選擇每個元件獨立使用自己的 JRE 環境。

本文分為兩個部分,其中,前七小節主要介紹基礎環境的搭建,最後一個小節介紹了資料是如何在各個元件中流通的。

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

資料的流動經過以下元件:

  • MySQL 資料源生成 Binlog。
  • Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。
  • Flink 使用 flink-sql-connector-kafka API,消費 Kafka Topic 中的資料。
  • Flink 在通過 flink-connector-jdbc,将資料寫入到 TiDB 中。

TiDB + Flink 的結構,支援開發與運作多種不同種類的應用程式。

目前主要的特性主要包括:

  • 批流一體化。
  • 精密的狀态管理。
  • 事件時間支援。
  • 精确的一次狀态一緻性保障。

Flink 可以運作在包括 YARN、Mesos、Kubernetes 在内的多種資源管理架構上,還支援裸機叢集上獨立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時也支援使用 TiUP 在裸機叢集上獨立部署。

TiDB + Flink 結構常見的幾類應用如下:

  • 事件驅動型應用:
    • 反欺詐。
    • 異常檢測。
    • 基于規則的報警。
    • 業務流程監控。
  • 資料分析應用:
    • 網絡品質監控。
    • 産品更新及試驗評估分析。
    • 事實資料即席分析。
    • 大規模圖分析。
  • 資料管道應用:
    • 電商實時查詢索引建構。
    • 電商持續 ETL。

二. 環境介紹

2.1 作業系統環境

[root@r20 topology]# cat /etc/redhat-release
CentOS Stream release 8           

2.2 軟體環境

2.3 機器配置設定

Hostname IP Component
r21 192.168.12.21 TiDB Cluster
r22 192.168.12.22
r23 192.168.12.23
r24 192.168.12.24
r25 192.168.12.25
r26 192.168.12.26

三. 部署 TiDB Cluster

與傳統的單機資料庫相比,TiDB 具有以下優勢:

  • 純分布式架構,擁有良好的擴充性,支援彈性的擴縮容。
  • 支援 SQL,對外暴露 MySQL 的網絡協定,并相容大多數 MySQL 的文法,在大多數場景下可以直接替換 MySQL。
  • 預設支援高可用,在少數副本失效的情況下,資料庫本身能夠自動進行資料修複和故障轉移,對業務透明。
  • 支援 ACID 事務,對于一些有強一緻需求的場景友好,例如:銀行轉賬。
  • 具有豐富的工具鍊生态,覆寫資料遷移、同步、備份等多種場景。

在核心設計上,TiDB 分布式資料庫将整體架構拆分成了多個子產品,各子產品之間互相通信,組成完整的 TiDB 系統。對應的架構圖如下:

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

在本文中,我們隻做最簡單的功能測試,是以部署了一套單節點但副本的 TiDB,涉及到了以下的三個子產品:

  • TiDB Server:SQL 層,對外暴露 MySQL 協定的連接配接 endpoint,負責接受用戶端的連接配接,執行 SQL 解析和優化,最終生成分布式執行計劃。
  • PD (Placement Driver) Server:整個 TiDB 叢集的元資訊管理子產品,負責存儲每個 TiKV 節點實時的資料分布情況和叢集的整體拓撲結構,提供 TiDB Dashboard 管控界面,并為分布式事務配置設定事務 ID。
  • TiKV Server:負責存儲資料,從外部看 TiKV 是一個分布式的提供事務的 Key-Value 存儲引擎。

3.1 TiUP 部署模闆檔案

# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing.
global:
  user: "tidb"
  ssh_port: 22
  deploy_dir: "/opt/tidb-c1/"
  data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
#  node_exporter_port: 19100
#  blackbox_exporter_port: 39115
#  deploy_dir: "/opt/tidb-c3/monitored"
#  data_dir: "/opt/tidb-c3/data/monitored"
#  log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# #   readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field.
server_configs:
  tidb:
    log.slow-threshold: 300
    binlog.enable: false
    binlog.ignore-error: false
    tikv-client.copr-cache.enable: true
  tikv:
    server.grpc-concurrency: 4
    raftstore.apply-pool-size: 2
    raftstore.store-pool-size: 2
    rocksdb.max-sub-compactions: 1
    storage.block-cache.capacity: "16GB"
    readpool.unified.max-thread-count: 12
    readpool.storage.use-unified-pool: false
    readpool.coprocessor.use-unified-pool: true
    raftdb.rate-bytes-per-sec: 0
  pd:
    schedule.leader-schedule-limit: 4
    schedule.region-schedule-limit: 2048
    schedule.replica-schedule-limit: 64
pd_servers:
  - host: 192.168.12.21
    ssh_port: 22
    name: "pd-2"
    client_port: 12379
    peer_port: 12380
    deploy_dir: "/opt/tidb-c1/pd-12379"
    data_dir: "/opt/tidb-c1/data/pd-12379"
    log_dir: "/opt/tidb-c1/log/pd-12379"
    numa_node: "0"
    # # The following configs are used to overwrite the `server_configs.pd` values.
    config:
      schedule.max-merge-region-size: 20
      schedule.max-merge-region-keys: 200000
tidb_servers:
  - host: 192.168.12.21
    ssh_port: 22
    port: 14000
    status_port: 12080
    deploy_dir: "/opt/tidb-c1/tidb-14000"
    log_dir: "/opt/tidb-c1/log/tidb-14000"
    numa_node: "0"
    # # The following configs are used to overwrite the `server_configs.tidb` values.
    config:
      log.slow-query-file: tidb-slow-overwrited.log
      tikv-client.copr-cache.enable: true
tikv_servers:
  - host: 192.168.12.21
    ssh_port: 22
    port: 12160
    status_port: 12180
    deploy_dir: "/opt/tidb-c1/tikv-12160"
    data_dir: "/opt/tidb-c1/data/tikv-12160"
    log_dir: "/opt/tidb-c1/log/tikv-12160"
    numa_node: "0"
    # # The following configs are used to overwrite the `server_configs.tikv` values.
    config:
      server.grpc-concurrency: 4
      #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
#monitoring_servers:
#  - host: 192.168.12.21
#    ssh_port: 22
#    port: 19090
#    deploy_dir: "/opt/tidb-c1/prometheus-19090"
#    data_dir: "/opt/tidb-c1/data/prometheus-19090"
#    log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
#  - host: 192.168.12.21
#    port: 13000
#    deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
#  - host: 192.168.12.21
#    ssh_port: 22
#    web_port: 19093
#    cluster_port: 19094
#    deploy_dir: "/opt/tidb-c1/alertmanager-19093"
#    data_dir: "/opt/tidb-c1/data/alertmanager-19093"
#    log_dir: "/opt/tidb-c1/log/alertmanager-19093"           

3.2 TiDB Cluster 環境

本文重點非部署 TiDB Cluster,作為快速實驗環境,隻在一台機器上部署單副本的 TiDB Cluster 叢集。不需要部署監控環境。

[root@r20 topology]# tiup cluster display tidb-c1-v409
Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409
Cluster type:       tidb
Cluster name:       tidb-c1-v409
Cluster version:    v4.0.9
SSH type:           builtin
Dashboard URL:      http://192.168.12.21:12379/dashboard
ID                   Role  Host           Ports        OS/Arch       Status   Data Dir                      Deploy Dir
--                   ----  ----           -----        -------       ------   --------                      ----------
192.168.12.21:12379  pd    192.168.12.21  12379/12380  linux/x86_64  Up|L|UI  /opt/tidb-c1/data/pd-12379    /opt/tidb-c1/pd-12379
192.168.12.21:14000  tidb  192.168.12.21  14000/12080  linux/x86_64  Up       -                             /opt/tidb-c1/tidb-14000
192.168.12.21:12160  tikv  192.168.12.21  12160/12180  linux/x86_64  Up       /opt/tidb-c1/data/tikv-12160  /opt/tidb-c1/tikv-12160
Total nodes: 4           

建立用于測試的表

mysql> show create table t1;
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                  |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| t1    | CREATE TABLE `t1` (
  `id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)           

四. 部署 Zookeeper 環境

在本實驗中單獨配置 Zookeeper 環境,為 Kafka 和 Flink 環境提供服務。

作為實驗示範方案,隻部署單機環境。

4.1 解壓 Zookeeper 包

[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper           

4.2 部署用于 Zookeeper 的 jre

[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre           

修改 /opt/zookeeper/bin/zkEnv.sh 檔案,增加 JAVA_HOME 環境變量

## add bellowing env var in the head of zkEnv.sh
JAVA_HOME=/opt/zookeeper/jre           

4.3 建立 Zookeeper 的配置檔案

[root@r24 conf]# cat zoo.cfg | grep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181           

4.4 啟動 Zookeeper

[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start           

4.5 檢查 Zookeeper 的狀态

## check zk status
[root@r24 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
## check OS port status
[root@r24 bin]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      942/sshd
tcp6       0      0 :::2181                 :::*                    LISTEN      15062/java
tcp6       0      0 :::8080                 :::*                    LISTEN      15062/java
tcp6       0      0 :::22                   :::*                    LISTEN      942/sshd
tcp6       0      0 :::44505                :::*                    LISTEN      15062/java
## use zkCli tool to check zk connection
[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181           

4.6 關于 Zookeeper 的建議

我個人有一個關于 Zookeeper 的不成熟的小建議:

Zookeeper 叢集版本一定要開啟網絡監控。特别是要關注 system metrics 裡面的 network bandwidth。

五. 部署 Kafka

Kafka 是一個分布式流處理平台,主要應用于兩大類的應用中:

  • 構造實時流資料管道,它可以在系統或應用之間可靠地擷取資料。 (相當于message queue)
  • 建構實時流式應用程式,對這些流資料進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間内部進行變化)
Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

Kafka 有四個核心的 API:

  • The Producer API 允許一個應用程式釋出一串流式的資料到一個或者多個Kafka topic。
  • Consumer API 允許一個應用程式訂閱一個或多個 topic ,并且對釋出給他們的流式資料進行處理。
  • Streams API 允許一個應用程式作為一個流處理器,消費一個或者多個topic産生的輸入流,然後生産一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
  • Connector API 允許建構并運作可重用的生産者或者消費者,将Kafka topics連接配接到已存在的應用程式或者資料系統。比如,連接配接到一個關系型資料庫,捕捉表(table)的所有變更内容。

在本實驗中隻做功能性驗證,隻搭建一個單機版的 Kafka 環境。

5.1 下載下傳并解壓 Kafka

[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka           

5.2 部署用于 Kafka 的 jre

[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre           

修改 Kafka 的 jre 環境變量

[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh
## add bellowing line in the head of kafka-run-class.sh
JAVA_HOME=/opt/kafka/jre           

5.3 修改 Kafka 配置檔案

修改 Kafka 配置檔案 /opt/kafka/config/server.properties

## change bellowing variable in /opt/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.12.22:9092
log.dirs=/opt/kafka/logs
zookeeper.connect=i192.168.12.24:2181           

5.4 啟動 Kafka

[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties           

5.5 檢視 Kafka 的版本資訊

Kafka 并沒有提供 --version 的 optional 來檢視 Kafka 的版本資訊。

[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root  4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar
-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc
-rw-r--r-- 1 root root    41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar
-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc
-rw-r--r-- 1 root root   892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar
-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc
... ...           

其中 2.13 是 scale 的版本資訊,2.7.0 是 Kafka 的版本資訊。

六. 部署 Flink

Apache Flink 是一個架構和分布式處理引擎,用于在無邊界和有邊界資料流上進行有狀态的計算。Flink 能在所有常見叢集環境中運作,并能以記憶體速度和任意規模進行計算。

支援高吞吐、低延遲、高性能的分布式處理架構 Apache Flink 是一個架構和分布式處理引擎,用于對無界和有界資料流進行有狀态計算。Flink被設計在所有常見的叢集環境中運作,以記憶體執行速度和任意規模來執行計算。

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

本實驗隻做功能性測試,僅部署單機 Flink 環境。

6.1 下載下傳并分發 Flink

[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink           

6.2 部署 Flink 的 jre

[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre           

6.3 添加 Flink 需要的 lib

Flink 消費 Kafka 資料,需要 flink-sql-connector-kafka 包。

Flink 連結 MySQL/TiDB,需要 flink-connector-jdbc 包。

[root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/           

6.4 修改 Flink 配置檔案

## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 192.168.12.23
env.java.home: /opt/flink/jre           

6.5 啟動 Flink

[root@r23 ~]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host r23.
Starting taskexecutor daemon on host r23.           

6.6 檢視 Flink GUI

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

七. 部署 MySQL

7.1 解壓 MySQL package

[root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
[root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/           

7.2 建立 MySQL Service 檔案

[root@r25 ~]# touch /opt/mysql/support-files/mysqld.service
[root@r25 support-files]# cat mysqld.service
[Unit]
Description=MySQL 8.0 database server
After=syslog.target
After=network.target
[Service]
Type=simple
User=mysql
Group=mysql
#ExecStartPre=/usr/libexec/mysql-check-socket
#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n
# Note: we set --basedir to prevent probes that might trigger SELinux alarms,
# per bug #547485
ExecStart=/opt/mysql/bin/mysqld_safe
#ExecStartPost=/opt/mysql/bin/mysql-check-upgrade
#ExecStopPost=/opt/mysql/bin/mysql-wait-stop
# Give a reasonable amount of time for the server to start up/shut down
TimeoutSec=300
# Place temp files in a secure directory, not /tmp
PrivateTmp=true
Restart=on-failure
RestartPreventExitStatus=1
# Sets open_files_limit
LimitNOFILE = 10000
# Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.
Environment=MYSQLD_PARENT_PID=1
[Install]
WantedBy=multi-user.target
## copy mysqld.service to /usr/lib/systemd/system/
[root@r25 support-files]# cp mysqld.service  /usr/lib/systemd/system/           

7.3 建立 my.cnf 檔案

[root@r34 opt]# cat /etc/my.cnf
[mysqld]
port=3306
basedir=/opt/mysql
datadir=/opt/mysql/data
socket=/opt/mysql/data/mysql.socket
max_connections = 100
default-storage-engine = InnoDB
character-set-server=utf8
log-error = /opt/mysql/log/error.log
slow_query_log = 1
long-query-time = 30
slow_query_log_file = /opt/mysql/log/show.log
min_examined_row_limit = 1000
log-slow-slave-statements
log-queries-not-using-indexes
#skip-grant-tables           

7.4 初始化并啟動 MySQL

[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console
[root@r25 ~]# chown -R mysql:mysql /opt/mysql
[root@r25 ~]# systemctl start mysqld
## check mysql temp passord from /opt/mysql/log/error.log
2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-           

7.5 建立一個新的 MySQL 使用者用以連接配接 Canal

## change mysql temp password firstly
mysql> alter user 'root'@'localhost' identified by 'mysql';
Query OK, 0 rows affected (0.00 sec)
## create a management user 'root'@'%'
mysql> create user 'root'@'%' identified by 'mysql';
Query OK, 0 rows affected (0.01 sec)
mysql> grant all privileges on *.* to 'root'@'%';
Query OK, 0 rows affected (0.00 sec)
## create a canal replication user 'canal'@'%'
mysql> create user 'canal'@'%' identified by 'canal';
Query OK, 0 rows affected (0.01 sec)
mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)           

7.6 在 MySQL 中建立用于測試的表

mysql> show create table test.t2;
+-------+----------------------------------------------------------------------------------+
| Table | Create Table                                                                     |
+-------+----------------------------------------------------------------------------------+
| t2    | CREATE TABLE `t2` (
  `id` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+----------------------------------------------------------------------------------+
1 row in set (0.00 sec)           

八. 部署 Canal

Canal 主要用途是基于 MySQL 資料庫增量日志解析,提供增量資料訂閱和消費。

早期阿裡巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實作方式主要是基于業務 trigger 擷取增量變更。

從 2010 年開始,業務逐漸嘗試資料庫日志解析擷取增量變更進行同步,由此衍生出了大量的資料庫增量訂閱和消費業務。

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

基于日志增量訂閱和消費的業務包括:

  • 資料庫鏡像。
  • 資料庫實時備份。
  • 索引建構和實時維護(拆分異構索引、反向索引等)。
  • 業務 cache 重新整理。
  • 帶業務邏輯的增量資料處理。

目前的 canal 支援源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

8.1 解壓 Canal 包

[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal           

8.2 部署 Canal 的 jre

[root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh 
JAVA=/opt/canal/jre/bin/java           

8.3 修改 Canal 的配置檔案

修改 /opt/canal/conf/canal.properties 配置檔案

## modify bellowing configuration
canal.zkServers =192.168.12.24:2181
canal.serverMode = kafka
canal.destinations = example        ## 需要在 /opt/canal/conf 目錄下建立一個 example 檔案夾,用于存放 destination 的配置
canal.mq.servers = 192.168.12.22:9092           

修改 /opt/canal/conf/example/instance.properties 配置檔案

## modify bellowing configuration
canal.instance.master.address=192.168.12.25:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..*                    ## 過濾資料庫的表
canal.mq.topic=canal-kafka           

九. 配置資料流向

9.1 MySQL Binlog -> Canal -> Kafka 通路

9.1.1 檢視 MySQL Binlog 資訊

檢視 MySQL Binlog 資訊,確定 Binlog 是正常的。

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000001 |     2888 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)           

9.1.2 在 Kafka 中建立一個 Topic

在 Kafka 中建立一個 Topic canal-kafka,這個Topic 的名字要與 Canal 配置檔案 /opt/canal/conf/example/instance.properties 中的 canal.mq.topic=canal-kafka 對應:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \
> --zookeeper 192.168.12.24:2181 \
> --config max.message.bytes=12800000 \
> --config flush.messages=1 \
> --replication-factor 1 \
> --partitions 1 \
> --topic canal-kafka
Created topic canal-kafka.
[2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)
[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)           

檢視 Kafka 中所有的 Topic:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181
__consumer_offsets
canal-kafka
ticdc-test           

檢視 Kafka 中 Topic ticdc-test 的資訊:

[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181  --topic canal-kafka
Topic: ticdc-test       PartitionCount: 1       ReplicationFactor: 1    Configs: max.message.bytes=12800000,flush.messages=1
        Topic: ticdc-test       Partition: 0    Leader: 0       Replicas: 0     Isr: 0           

9.1.3 啟動 Canal

在啟動 Canal 之前,需要在 Canal 節點上檢視一下端口的情況:

## check MySQL 3306 port
## canal.instance.master.address=192.168.12.25:3306
[root@r26 bin]# telnet 192.168.12.25 3306
## check Kafka 9092 port
## canal.mq.servers = 192.168.12.22:9092
[root@r26 bin]# telnet 192.168.12.22 9092
## check zookeeper 2181 port
## canal.zkServers = 192.168.12.24:2181
[root@r26 bin]# telnet 192.168.12.24 2181           

啟動 Canal:

[root@r26 bin]# /opt/canal/bin/startup.sh
cd to /opt/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/canal/bin/../conf/logback.xml
canal conf : /opt/canal/bin/../conf/canal.properties
CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar:
cd to /opt/canal/bin for continue           

9.1.4 檢視 Canal 日志

檢視 /opt/canal/logs/example/example.log

2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump           

9.1.5 檢視 Kafka 中 consumer 資訊

在 MySQL 中插入一條測試資訊:

mysql> insert into t2 values(1);
Query OK, 1 row affected (0.00 sec)           

檢視 consumer 的資訊,已經有了剛才插入的測試資料:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning
{"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"}
{"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"}
{"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}           

9.2 Kafka -> Flink 通路

在 Flink 中建立 t2 表,connector 類型為 kafka。

## create a test table t2 in Flink
Flink SQL> create table t2(id int)
> WITH (
>  'connector' = 'kafka',
>  'topic' = 'canal-kafka',
>  'properties.bootstrap.servers' = '192.168.12.22:9092',
>  'properties.group.id' = 'canal-kafka-consumer-group',
>  'format' = 'canal-json',
>  'scan.startup.mode' = 'latest-offset'
> );
Flink SQL> select * from t1;           

在 MySQL 中在插入一條測試資料:

mysql> insert into test.t2 values(2);
Query OK, 1 row affected (0.00 sec)           

從 Flink 中可以實時同步資料:

Flink SQL> select * from t1;
 Refresh: 1 s                                                                                                             Page: Last of 1                                                                                                     Updated: 02:49:27.366
                        id
                         2           

9.3 Flink -> TiDB 通路

9.3.1 在 下遊的 TiDB 中建立用于測試的表

[root@r20 soft]# mysql -uroot -P14000 -hr21
mysql> create table t3 (id int);
Query OK, 0 rows affected (0.31 sec)           

9.3.2 在 Flink 中建立測試表

Flink SQL> CREATE TABLE t3 (
>     id int
> ) with (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://192.168.12.21:14000/test',
>     'table-name' = 't3',
>     'username' = 'root',
>     'password' = 'mysql'
> );
Flink SQL> insert into t3 values(3);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: a0827487030db177ee7e5c8575ef714e           

9.3.3 在下遊 TiDB 中檢視插入的資料

mysql> select * from test.t3;
+------+
| id   |
+------+
|    3 |
+------+
1 row in set (0.00 sec)           

活動推薦一

Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB

報名連結:

https://1712399719478.huodongxing.com/event/1594531547711

活動推薦二

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
Flink 最佳實踐之使用 Canal 同步 MySQL 資料至 TiDB