一. 背景介紹
本文将介紹如何将 MySQL 中的資料,通過 Binlog + Canal 的形式導入到 Kafka 中,繼而被 Flink 消費的案例。
為了能夠快速的驗證整套流程的功能性,所有的元件都以單機的形式部署。如果手上的實體資源不足,可以将本文中的所有元件一台 4G 1U 的虛拟機環境中。
如果需要在生産環境中部署,建議将每一個元件替換成高可用的叢集部署方案。
其中,我們單獨建立了一套 Zookeeper 單節點環境,Flink、Kafka、Canal 等元件共用這個 Zookeeper 環境。
針對于所有需要 JRE 的元件,如 Flink,Kafka,Canal,Zookeeper,考慮到更新 JRE 可能會影響到其他的應用,我們選擇每個元件獨立使用自己的 JRE 環境。
本文分為兩個部分,其中,前七小節主要介紹基礎環境的搭建,最後一個小節介紹了資料是如何在各個元件中流通的。

資料的流動經過以下元件:
- MySQL 資料源生成 Binlog。
- Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。
- Flink 使用 flink-sql-connector-kafka API,消費 Kafka Topic 中的資料。
- Flink 在通過 flink-connector-jdbc,将資料寫入到 TiDB 中。
TiDB + Flink 的結構,支援開發與運作多種不同種類的應用程式。
目前主要的特性主要包括:
- 批流一體化。
- 精密的狀态管理。
- 事件時間支援。
- 精确的一次狀态一緻性保障。
Flink 可以運作在包括 YARN、Mesos、Kubernetes 在内的多種資源管理架構上,還支援裸機叢集上獨立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時也支援使用 TiUP 在裸機叢集上獨立部署。
TiDB + Flink 結構常見的幾類應用如下:
- 事件驅動型應用:
- 反欺詐。
- 異常檢測。
- 基于規則的報警。
- 業務流程監控。
- 資料分析應用:
- 網絡品質監控。
- 産品更新及試驗評估分析。
- 事實資料即席分析。
- 大規模圖分析。
- 資料管道應用:
- 電商實時查詢索引建構。
- 電商持續 ETL。
二. 環境介紹
2.1 作業系統環境
[root@r20 topology]# cat /etc/redhat-release
CentOS Stream release 8
2.2 軟體環境
2.3 機器配置設定
Hostname | IP | Component |
---|---|---|
r21 | 192.168.12.21 | TiDB Cluster |
r22 | 192.168.12.22 | |
r23 | 192.168.12.23 | |
r24 | 192.168.12.24 | |
r25 | 192.168.12.25 | |
r26 | 192.168.12.26 |
三. 部署 TiDB Cluster
與傳統的單機資料庫相比,TiDB 具有以下優勢:
- 純分布式架構,擁有良好的擴充性,支援彈性的擴縮容。
- 支援 SQL,對外暴露 MySQL 的網絡協定,并相容大多數 MySQL 的文法,在大多數場景下可以直接替換 MySQL。
- 預設支援高可用,在少數副本失效的情況下,資料庫本身能夠自動進行資料修複和故障轉移,對業務透明。
- 支援 ACID 事務,對于一些有強一緻需求的場景友好,例如:銀行轉賬。
- 具有豐富的工具鍊生态,覆寫資料遷移、同步、備份等多種場景。
在核心設計上,TiDB 分布式資料庫将整體架構拆分成了多個子產品,各子產品之間互相通信,組成完整的 TiDB 系統。對應的架構圖如下:
在本文中,我們隻做最簡單的功能測試,是以部署了一套單節點但副本的 TiDB,涉及到了以下的三個子產品:
- TiDB Server:SQL 層,對外暴露 MySQL 協定的連接配接 endpoint,負責接受用戶端的連接配接,執行 SQL 解析和優化,最終生成分布式執行計劃。
- PD (Placement Driver) Server:整個 TiDB 叢集的元資訊管理子產品,負責存儲每個 TiKV 節點實時的資料分布情況和叢集的整體拓撲結構,提供 TiDB Dashboard 管控界面,并為分布式事務配置設定事務 ID。
- TiKV Server:負責存儲資料,從外部看 TiKV 是一個分布式的提供事務的 Key-Value 存儲引擎。
3.1 TiUP 部署模闆檔案
# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing.
global:
user: "tidb"
ssh_port: 22
deploy_dir: "/opt/tidb-c1/"
data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
# node_exporter_port: 19100
# blackbox_exporter_port: 39115
# deploy_dir: "/opt/tidb-c3/monitored"
# data_dir: "/opt/tidb-c3/data/monitored"
# log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# # readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field.
server_configs:
tidb:
log.slow-threshold: 300
binlog.enable: false
binlog.ignore-error: false
tikv-client.copr-cache.enable: true
tikv:
server.grpc-concurrency: 4
raftstore.apply-pool-size: 2
raftstore.store-pool-size: 2
rocksdb.max-sub-compactions: 1
storage.block-cache.capacity: "16GB"
readpool.unified.max-thread-count: 12
readpool.storage.use-unified-pool: false
readpool.coprocessor.use-unified-pool: true
raftdb.rate-bytes-per-sec: 0
pd:
schedule.leader-schedule-limit: 4
schedule.region-schedule-limit: 2048
schedule.replica-schedule-limit: 64
pd_servers:
- host: 192.168.12.21
ssh_port: 22
name: "pd-2"
client_port: 12379
peer_port: 12380
deploy_dir: "/opt/tidb-c1/pd-12379"
data_dir: "/opt/tidb-c1/data/pd-12379"
log_dir: "/opt/tidb-c1/log/pd-12379"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.pd` values.
config:
schedule.max-merge-region-size: 20
schedule.max-merge-region-keys: 200000
tidb_servers:
- host: 192.168.12.21
ssh_port: 22
port: 14000
status_port: 12080
deploy_dir: "/opt/tidb-c1/tidb-14000"
log_dir: "/opt/tidb-c1/log/tidb-14000"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tidb` values.
config:
log.slow-query-file: tidb-slow-overwrited.log
tikv-client.copr-cache.enable: true
tikv_servers:
- host: 192.168.12.21
ssh_port: 22
port: 12160
status_port: 12180
deploy_dir: "/opt/tidb-c1/tikv-12160"
data_dir: "/opt/tidb-c1/data/tikv-12160"
log_dir: "/opt/tidb-c1/log/tikv-12160"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tikv` values.
config:
server.grpc-concurrency: 4
#server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
#monitoring_servers:
# - host: 192.168.12.21
# ssh_port: 22
# port: 19090
# deploy_dir: "/opt/tidb-c1/prometheus-19090"
# data_dir: "/opt/tidb-c1/data/prometheus-19090"
# log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
# - host: 192.168.12.21
# port: 13000
# deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
# - host: 192.168.12.21
# ssh_port: 22
# web_port: 19093
# cluster_port: 19094
# deploy_dir: "/opt/tidb-c1/alertmanager-19093"
# data_dir: "/opt/tidb-c1/data/alertmanager-19093"
# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
3.2 TiDB Cluster 環境
本文重點非部署 TiDB Cluster,作為快速實驗環境,隻在一台機器上部署單副本的 TiDB Cluster 叢集。不需要部署監控環境。
[root@r20 topology]# tiup cluster display tidb-c1-v409
Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409
Cluster type: tidb
Cluster name: tidb-c1-v409
Cluster version: v4.0.9
SSH type: builtin
Dashboard URL: http://192.168.12.21:12379/dashboard
ID Role Host Ports OS/Arch Status Data Dir Deploy Dir
-- ---- ---- ----- ------- ------ -------- ----------
192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379
192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000
192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160
Total nodes: 4
建立用于測試的表
mysql> show create table t1;
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| t1 | CREATE TABLE `t1` (
`id` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
四. 部署 Zookeeper 環境
在本實驗中單獨配置 Zookeeper 環境,為 Kafka 和 Flink 環境提供服務。
作為實驗示範方案,隻部署單機環境。
4.1 解壓 Zookeeper 包
[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
4.2 部署用于 Zookeeper 的 jre
[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
修改 /opt/zookeeper/bin/zkEnv.sh 檔案,增加 JAVA_HOME 環境變量
## add bellowing env var in the head of zkEnv.sh
JAVA_HOME=/opt/zookeeper/jre
4.3 建立 Zookeeper 的配置檔案
[root@r24 conf]# cat zoo.cfg | grep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
4.4 啟動 Zookeeper
[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start
4.5 檢查 Zookeeper 的狀态
## check zk status
[root@r24 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
## check OS port status
[root@r24 bin]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd
tcp6 0 0 :::2181 :::* LISTEN 15062/java
tcp6 0 0 :::8080 :::* LISTEN 15062/java
tcp6 0 0 :::22 :::* LISTEN 942/sshd
tcp6 0 0 :::44505 :::* LISTEN 15062/java
## use zkCli tool to check zk connection
[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
4.6 關于 Zookeeper 的建議
我個人有一個關于 Zookeeper 的不成熟的小建議:
Zookeeper 叢集版本一定要開啟網絡監控。特别是要關注 system metrics 裡面的 network bandwidth。
五. 部署 Kafka
Kafka 是一個分布式流處理平台,主要應用于兩大類的應用中:
- 構造實時流資料管道,它可以在系統或應用之間可靠地擷取資料。 (相當于message queue)
- 建構實時流式應用程式,對這些流資料進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間内部進行變化)
Kafka 有四個核心的 API:
- The Producer API 允許一個應用程式釋出一串流式的資料到一個或者多個Kafka topic。
- Consumer API 允許一個應用程式訂閱一個或多個 topic ,并且對釋出給他們的流式資料進行處理。
- Streams API 允許一個應用程式作為一個流處理器,消費一個或者多個topic産生的輸入流,然後生産一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
- Connector API 允許建構并運作可重用的生産者或者消費者,将Kafka topics連接配接到已存在的應用程式或者資料系統。比如,連接配接到一個關系型資料庫,捕捉表(table)的所有變更内容。
在本實驗中隻做功能性驗證,隻搭建一個單機版的 Kafka 環境。
5.1 下載下傳并解壓 Kafka
[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka
5.2 部署用于 Kafka 的 jre
[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
修改 Kafka 的 jre 環境變量
[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh
## add bellowing line in the head of kafka-run-class.sh
JAVA_HOME=/opt/kafka/jre
5.3 修改 Kafka 配置檔案
修改 Kafka 配置檔案 /opt/kafka/config/server.properties
## change bellowing variable in /opt/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.12.22:9092
log.dirs=/opt/kafka/logs
zookeeper.connect=i192.168.12.24:2181
5.4 啟動 Kafka
[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
5.5 檢視 Kafka 的版本資訊
Kafka 并沒有提供 --version 的 optional 來檢視 Kafka 的版本資訊。
[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc
-rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc
-rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc
... ...
其中 2.13 是 scale 的版本資訊,2.7.0 是 Kafka 的版本資訊。
六. 部署 Flink
Apache Flink 是一個架構和分布式處理引擎,用于在無邊界和有邊界資料流上進行有狀态的計算。Flink 能在所有常見叢集環境中運作,并能以記憶體速度和任意規模進行計算。
支援高吞吐、低延遲、高性能的分布式處理架構 Apache Flink 是一個架構和分布式處理引擎,用于對無界和有界資料流進行有狀态計算。Flink被設計在所有常見的叢集環境中運作,以記憶體執行速度和任意規模來執行計算。
本實驗隻做功能性測試,僅部署單機 Flink 環境。
6.1 下載下傳并分發 Flink
[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink
6.2 部署 Flink 的 jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
6.3 添加 Flink 需要的 lib
Flink 消費 Kafka 資料,需要 flink-sql-connector-kafka 包。
Flink 連結 MySQL/TiDB,需要 flink-connector-jdbc 包。
[root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
6.4 修改 Flink 配置檔案
## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 192.168.12.23
env.java.home: /opt/flink/jre
6.5 啟動 Flink
[root@r23 ~]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host r23.
Starting taskexecutor daemon on host r23.
6.6 檢視 Flink GUI
七. 部署 MySQL
7.1 解壓 MySQL package
[root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
[root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/
7.2 建立 MySQL Service 檔案
[root@r25 ~]# touch /opt/mysql/support-files/mysqld.service
[root@r25 support-files]# cat mysqld.service
[Unit]
Description=MySQL 8.0 database server
After=syslog.target
After=network.target
[Service]
Type=simple
User=mysql
Group=mysql
#ExecStartPre=/usr/libexec/mysql-check-socket
#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n
# Note: we set --basedir to prevent probes that might trigger SELinux alarms,
# per bug #547485
ExecStart=/opt/mysql/bin/mysqld_safe
#ExecStartPost=/opt/mysql/bin/mysql-check-upgrade
#ExecStopPost=/opt/mysql/bin/mysql-wait-stop
# Give a reasonable amount of time for the server to start up/shut down
TimeoutSec=300
# Place temp files in a secure directory, not /tmp
PrivateTmp=true
Restart=on-failure
RestartPreventExitStatus=1
# Sets open_files_limit
LimitNOFILE = 10000
# Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.
Environment=MYSQLD_PARENT_PID=1
[Install]
WantedBy=multi-user.target
## copy mysqld.service to /usr/lib/systemd/system/
[root@r25 support-files]# cp mysqld.service /usr/lib/systemd/system/
7.3 建立 my.cnf 檔案
[root@r34 opt]# cat /etc/my.cnf
[mysqld]
port=3306
basedir=/opt/mysql
datadir=/opt/mysql/data
socket=/opt/mysql/data/mysql.socket
max_connections = 100
default-storage-engine = InnoDB
character-set-server=utf8
log-error = /opt/mysql/log/error.log
slow_query_log = 1
long-query-time = 30
slow_query_log_file = /opt/mysql/log/show.log
min_examined_row_limit = 1000
log-slow-slave-statements
log-queries-not-using-indexes
#skip-grant-tables
7.4 初始化并啟動 MySQL
[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console
[root@r25 ~]# chown -R mysql:mysql /opt/mysql
[root@r25 ~]# systemctl start mysqld
## check mysql temp passord from /opt/mysql/log/error.log
2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-
7.5 建立一個新的 MySQL 使用者用以連接配接 Canal
## change mysql temp password firstly
mysql> alter user 'root'@'localhost' identified by 'mysql';
Query OK, 0 rows affected (0.00 sec)
## create a management user 'root'@'%'
mysql> create user 'root'@'%' identified by 'mysql';
Query OK, 0 rows affected (0.01 sec)
mysql> grant all privileges on *.* to 'root'@'%';
Query OK, 0 rows affected (0.00 sec)
## create a canal replication user 'canal'@'%'
mysql> create user 'canal'@'%' identified by 'canal';
Query OK, 0 rows affected (0.01 sec)
mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
7.6 在 MySQL 中建立用于測試的表
mysql> show create table test.t2;
+-------+----------------------------------------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------------------------------------+
| t2 | CREATE TABLE `t2` (
`id` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+----------------------------------------------------------------------------------+
1 row in set (0.00 sec)
八. 部署 Canal
Canal 主要用途是基于 MySQL 資料庫增量日志解析,提供增量資料訂閱和消費。
早期阿裡巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實作方式主要是基于業務 trigger 擷取增量變更。
從 2010 年開始,業務逐漸嘗試資料庫日志解析擷取增量變更進行同步,由此衍生出了大量的資料庫增量訂閱和消費業務。
基于日志增量訂閱和消費的業務包括:
- 資料庫鏡像。
- 資料庫實時備份。
- 索引建構和實時維護(拆分異構索引、反向索引等)。
- 業務 cache 重新整理。
- 帶業務邏輯的增量資料處理。
目前的 canal 支援源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
8.1 解壓 Canal 包
[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal
8.2 部署 Canal 的 jre
[root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh
JAVA=/opt/canal/jre/bin/java
8.3 修改 Canal 的配置檔案
修改 /opt/canal/conf/canal.properties 配置檔案
## modify bellowing configuration
canal.zkServers =192.168.12.24:2181
canal.serverMode = kafka
canal.destinations = example ## 需要在 /opt/canal/conf 目錄下建立一個 example 檔案夾,用于存放 destination 的配置
canal.mq.servers = 192.168.12.22:9092
修改 /opt/canal/conf/example/instance.properties 配置檔案
## modify bellowing configuration
canal.instance.master.address=192.168.12.25:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..* ## 過濾資料庫的表
canal.mq.topic=canal-kafka
九. 配置資料流向
9.1 MySQL Binlog -> Canal -> Kafka 通路
9.1.1 檢視 MySQL Binlog 資訊
檢視 MySQL Binlog 資訊,確定 Binlog 是正常的。
mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000001 | 2888 | | | |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
9.1.2 在 Kafka 中建立一個 Topic
在 Kafka 中建立一個 Topic canal-kafka,這個Topic 的名字要與 Canal 配置檔案 /opt/canal/conf/example/instance.properties 中的 canal.mq.topic=canal-kafka 對應:
[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \
> --zookeeper 192.168.12.24:2181 \
> --config max.message.bytes=12800000 \
> --config flush.messages=1 \
> --replication-factor 1 \
> --partitions 1 \
> --topic canal-kafka
Created topic canal-kafka.
[2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)
[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)
檢視 Kafka 中所有的 Topic:
[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181
__consumer_offsets
canal-kafka
ticdc-test
檢視 Kafka 中 Topic ticdc-test 的資訊:
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic canal-kafka
Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1
Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
9.1.3 啟動 Canal
在啟動 Canal 之前,需要在 Canal 節點上檢視一下端口的情況:
## check MySQL 3306 port
## canal.instance.master.address=192.168.12.25:3306
[root@r26 bin]# telnet 192.168.12.25 3306
## check Kafka 9092 port
## canal.mq.servers = 192.168.12.22:9092
[root@r26 bin]# telnet 192.168.12.22 9092
## check zookeeper 2181 port
## canal.zkServers = 192.168.12.24:2181
[root@r26 bin]# telnet 192.168.12.24 2181
啟動 Canal:
[root@r26 bin]# /opt/canal/bin/startup.sh
cd to /opt/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/canal/bin/../conf/logback.xml
canal conf : /opt/canal/bin/../conf/canal.properties
CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar:
cd to /opt/canal/bin for continue
9.1.4 檢視 Canal 日志
檢視 /opt/canal/logs/example/example.log
2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump
9.1.5 檢視 Kafka 中 consumer 資訊
在 MySQL 中插入一條測試資訊:
mysql> insert into t2 values(1);
Query OK, 1 row affected (0.00 sec)
檢視 consumer 的資訊,已經有了剛才插入的測試資料:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning
{"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"}
{"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"}
{"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}
9.2 Kafka -> Flink 通路
在 Flink 中建立 t2 表,connector 類型為 kafka。
## create a test table t2 in Flink
Flink SQL> create table t2(id int)
> WITH (
> 'connector' = 'kafka',
> 'topic' = 'canal-kafka',
> 'properties.bootstrap.servers' = '192.168.12.22:9092',
> 'properties.group.id' = 'canal-kafka-consumer-group',
> 'format' = 'canal-json',
> 'scan.startup.mode' = 'latest-offset'
> );
Flink SQL> select * from t1;
在 MySQL 中在插入一條測試資料:
mysql> insert into test.t2 values(2);
Query OK, 1 row affected (0.00 sec)
從 Flink 中可以實時同步資料:
Flink SQL> select * from t1;
Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366
id
2
9.3 Flink -> TiDB 通路
9.3.1 在 下遊的 TiDB 中建立用于測試的表
[root@r20 soft]# mysql -uroot -P14000 -hr21
mysql> create table t3 (id int);
Query OK, 0 rows affected (0.31 sec)
9.3.2 在 Flink 中建立測試表
Flink SQL> CREATE TABLE t3 (
> id int
> ) with (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.12.21:14000/test',
> 'table-name' = 't3',
> 'username' = 'root',
> 'password' = 'mysql'
> );
Flink SQL> insert into t3 values(3);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: a0827487030db177ee7e5c8575ef714e
9.3.3 在下遊 TiDB 中檢視插入的資料
mysql> select * from test.t3;
+------+
| id |
+------+
| 3 |
+------+
1 row in set (0.00 sec)
活動推薦一
報名連結:
https://1712399719478.huodongxing.com/event/1594531547711活動推薦二
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc