天天看點

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

第1章 Canal 簡介

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

canal [kə'næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 資料庫增量日志解析,提供增量資料訂閱和消費

工作原理

  • canal 模拟 MySQL slave 的互動協定,僞裝自己為 MySQL slave ,向 MySQL master 發送 dump 協定
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始為 byte 流)

第2章 Canal 快速開始

環境準備

主機環境:Windows 11

資料庫版本:MySQL-8資料庫

Canal版本:canal.deployer-1.1.6

MySQL8準備

(1)檢查MySQL 的binlog功能是否有開啟

-- 是否開啟binlog
show VARIABLES like 'log_bin';
           

(2)如果顯示狀态為OFF表示該功能未開啟,開啟binlog功能

1,修改 mysql 的配置檔案 my.cnf
**/**/my.cnf
末尾追加内容:
#binlog檔案名
log-bin=mysql-bin

#選擇row模式
binlog_format=ROW

#mysql執行個體id,不能和canal的slaveId重複
server_id=1
2,windows 重新開機 mysql
           

(3)在mysql裡面添加以下的相關使用者和權限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO
'canal'@'%';
FLUSH PRIVILEGES;
           

Windows 安裝Canal

下載下傳位址

Canal Git 位址:https://github.com/alibaba/canal/releases

解壓及配置

解壓canal.deployer-1.1.6.tar.gz,我們可以看到裡面有四個檔案夾:

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

Canal 啟動配置

(1)打開配置檔案conf/example/instance.properties

#################################################
## v1.0.26版本後會自動生成slaveId,是以可以不用配置
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# 資料庫位址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名稱
canal.instance.master.journal.name=mysql-bin.000001
# binlog偏移量
canal.instance.master.position=913
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
           

知識點拓展:檢視MySQL 的binlog日志名稱 和binlog 偏移量

# 檢視目前伺服器使用的biglog檔案及大小
show binary logs;

# 檢視最新一個binlog日志檔案名稱和Position
show master status;

# 檢視 binlog 日志清單
show master logs;
           
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

(2)Canal 啟動

切換至Canal項目bin 檔案夾(D:\Canal\canal.deployer-1.1.6\bin),輕按兩下啟動startup.bat

(3)檢視Canal server 日志

切換至Canal項目logs/canal 檔案夾

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

檢視logs/canal/canal.log 日志内容

2023-02-06 15:45:55.188 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-06 15:45:55.193 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-06 15:45:55.198 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-06 15:45:55.358 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.43.80(192.168.43.80):11111]
2023-02-06 15:45:56.260 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
           

(4)檢視Canal instance 日志

切換至Canal項目logs/example檔案夾

2023-02-06 17:06:18.146 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-02-06 17:06:18.148 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-02-06 17:06:18.148 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-02-06 17:06:18.148 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-06 17:06:18.201 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-06 17:06:18.316 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000001:4:1675666212000
2023-02-06 17:06:18.809 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1675666212000] cost : 608ms , the next step is binlog dump
2023-02-06 17:06:33.829 [MultiStageCoprocessor-other-example-0] WARN  com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157
2023-02-06 17:06:48.835 [MultiStageCoprocessor-other-example-0] WARN  com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157
           

(5)Canal 停止

直接關閉Canal 服務運作視窗即可。

第3章 Docker 安裝Canal

第一步:檢視本地鏡像、檢查Canal鏡像和下載下傳Canal 鏡像

# 檢視本地鏡像

docker images

# 檢索Kafka鏡像

docker search canal

# 下載下傳Kafka 鏡像指定版本

docker pull canal/canal-server:latest
           

第二步:docker 啟動Canal

docker run --name canal -d canal/canal-server
           

知識點拓展:拷貝Canal 容器内部配置檔案拷貝到外部

文法:docker cp [容器索引]:[内部路徑] [外部路徑]

執行個體:

docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canal
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal
           

第三步:修改Canal 配置檔案instance.properties

# 編輯配置檔案
vi  /home/canal/instance.properties
           

編輯内容:

#################################################
## v1.0.26版本後會自動生成slaveId,是以可以不用配置
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# 資料庫位址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名稱
canal.instance.master.journal.name=mysql-bin.000001
# binlog偏移量
canal.instance.master.position=913
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
           

第四步:基于Canal 外部配置檔案,重新Canal 容器執行個體

docker run --name canal -p 11111:11111 -d -v /home/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server
           

可選指令:

  • 關閉Canal 容器
docker stop canal
           
  • 移除Canal 容器
docker rm canal
           

第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步

搭建一套可以用的元件需要部署MySQL、Zookeeper、Kafka和Canal四個中間件的執行個體。

Docker 安裝MySQL

請參考:Docker 安裝MySQL

CentOS-7安裝ZooKeeper

Canal和Kafka叢集都依賴于Zookeeper做服務協調,為了友善管理,一般會獨立部署Zookeeper服務或者Zookeeper叢集。

midkr /data/zk
# 建立資料目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg
           

把zoo.cfg檔案中的dataDir設定為/data/zk/data,然後啟動Zookeeper:

[[email protected] conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
           

注意一點,要啟動此版本的Zookeeper服務必須本地安裝好JDK8+。啟動的預設端口是2181,啟動成功後的日志如下:

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

CentOS-7 安裝Kafka

Kafka是一個高性能分布式消息隊列中間件,它的部署依賴于Zookeeper。筆者在此選用2.4.0并且Scala版本為2.13的安裝包:

mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz
           

解壓後/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對應的zookeeper.connect=localhost:2181已經符合需要,不必修改,需要修改日志檔案的存放目錄log.dirs為/data/kafka/data。然後啟動Kafka服務:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties
           
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

知識拓展:kafka 背景進行運作設定

Kafka啟動後一旦退出控制台就會結束Kafka程序,可以添加-daemon參數用于控制Kafka程序背景不挂斷運作。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties
           

CentOS-7 安裝Canal

CentOS 安裝Canal 核心步驟

mkdir /data/canal
cd /data/canal
# 這裡注意一點,Github在國内被牆,下載下傳速度極慢,可以先用其他下載下傳工具下載下傳完再上傳到伺服器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz
           

Canal 解壓後目錄說明:

- bin   # 運維腳本
- conf  # 配置檔案
  canal_local.properties  # canal本地配置,一般不需要動
  canal.properties        # canal服務配置
  logback.xml             # logback日志配置
  metrics                 # 度量統計配置
  spring                  # spring-執行個體配置,主要和binlog位置計算、一些政策配置相關,可以在canal.properties選用其中的任意一個配置檔案
  example                 # 執行個體配置檔案夾,一般認為單個資料庫對應一個獨立的執行個體配置檔案夾
    instance.properties   # 執行個體配置,一般指單個資料庫的配置
- lib   # 服務依賴包
- logs  # 日志檔案輸出目錄
           

在開發和測試環境建議把logback.xml的日志級别修改為DEBUG友善定位問題。這裡需要關注canal.properties和instance.properties兩個配置檔案。canal.properties檔案中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16這個配置項的注釋,也就是啟用此配置項,和執行個體解析器的線程數相關,不配置會表現為阻塞或者不進行解析。
  • canal.serverMode配置項指定為kafka,可選值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以選用rabbitmq),預設是kafka。
  • canal.mq.servers配置需要指定為Kafka服務或者叢集Broker的位址,這裡配置為127.0.0.1:9092
canal.mq.servers在不同的canal.serverMode有不同的意義。
kafka模式下,指Kafka服務或者叢集Broker的位址,也就是bootstrap.servers
rocketmq模式下,指NameServer清單
rabbitmq模式下,指RabbitMQ服務的Host和Port
           

本文Kafka執行個體配置:

找到canal.deployer-1.1.6/conf目錄下的canal.properties配置檔案:

# tcp, kafka, RocketMQ 這裡選擇kafka模式
canal.serverMode = kafka
# 解析器的線程數,打開此配置,不打開則會出現阻塞或者不進行解析的情況
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服務位址,這裡配置的是kafka對應的位址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目錄下要有example同名的目錄,可以配置多個
canal.destinations = example
           

其他配置項可以參考下面兩個官方Wiki的連結:

  • Canal-Kafka-RocketMQ-QuickStart
  • AdminGuide

instance.properties一般指一個資料庫執行個體的配置,Canal架構支援一個Canal服務執行個體,處理多個資料庫執行個體的binlog異步解析。instance.properties需要修改的配置項主要包括:

  • canal.instance.mysql.slaveId需要配置一個和Master節點的服務ID完全不同的值,這裡筆者配置為654321。
  • 配置資料源執行個體,包括位址、使用者、密碼和目标資料庫:
  • canal.instance.master.address,這裡指定為127.0.0.1:3306。
  • canal.instance.dbUsername,這裡指定為canal。
  • canal.instance.dbPassword,這裡指定為QWqw12!@。
  • 新增canal.instance.defaultDatabaseName,這裡指定為test(需要在MySQL中建立一個test資料庫,見前面的流程)。
  • Kafka相關配置,這裡暫時使用靜态topic和單個partition:
  • canal.mq.topic,這裡指定為test,也就是解析完的binlog結構化資料會發送到Kafka的命名為test的topic中。
  • canal.mq.partition,這裡指定為0。

本文MySQL8執行個體配置:

配置instance,找到/conf/example/instance.properties配置檔案:

## mysql serverId , v1.0.26+ will autoGen(自動生成,不需配置)
# canal.instance.mysql.slaveId=0

# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql執行 SHOW MASTER STATUS;檢視目前資料庫的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 賬号密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ隊列名稱
canal.mq.topic=canaltopic
#單隊列模式的分區下标
canal.mq.partition=0
           

配置工作做好之後,可以啟動Canal服務:

sh /data/canal/bin/startup.sh 
# 檢視服務日志
tail -100f /data/canal/logs/canal/canal
# 檢視執行個體日志  -- 一般情況下,關注執行個體日志即可
tail -100f /data/canal/logs/example/example.log
           

啟動正常後,見執行個體日志如下:

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

資料示範

在test資料庫建立一個訂單表

use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',
    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';

INSERT INTO `order`(order_id, amount) VALUES ('20230207093012', 1999);
UPDATE `order` SET amount = 2000 WHERE order_id = '20230207093012';
DELETE  FROM `order` WHERE order_id = '20230207093012';
           

利用Kafka的kafka-console-consumer或者Kafka Tools檢視test這個topic的資料:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
           
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步

Docker 安裝MySQL

請參考:Docker 安裝MySQL

Docker 安裝RabbitMQ

請參考:Docker 安裝RabbitMQ

RabbitMQ 增加交換機和隊列

  • 添加交換機 canal_exchange
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南
  • 添加隊列 canal_queue
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南
  • 隊列綁定交換機
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

CentOS-7 安裝Canal

centos-7 解壓安裝Canal與上 一章節一緻,顧不在做較長的描述***。本章節重點講解Canal 配置RabbitMQ 參數配置。

Canal Server配置

需要配置的東西就兩項,一個是監聽資料庫配置,另一個是 RabbitMQ 連接配接配置。

instance.properties

監聽資料庫配置

cd /example 目錄下

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

canal.properties

配置 Canal 服務方式為 RabbitMQ 和連接配接配置

進入到conf檔案,打開canal.properties

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

serverMode(服務模式)修改為rabbitMQ,預設TCP.

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

RabbitMQ 服務相關參數設定。

第6章 Canal API

快速開始

第一步:maven 添加相關jar包依賴

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
           

第二步:編寫main方法測試

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 建立連結
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 擷取指定數量的資料
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 送出确認
            // connector.rollback(batchId); // 處理失敗, 復原資料
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

}
           

第三步:啟動Canal 服務

第四步: 運作Canal 用戶端的main 方法,控制台輸出如下資訊:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
           

含義:資料庫無變更記錄。

第五步:模拟資料庫變更操作

mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,
    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    ->   PRIMARY KEY (`ID`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
           

再次檢視Canal 用戶端,控制台輸出資訊:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4    update=true
X : 2013-02-05 23:29:46    update=true
           

Canal API 文檔說明

Canal 類設計

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

大緻分為幾部分:

  • ClientIdentity

    canal client和server互動之間的身份辨別,目前clientId寫死為1001. (目前canal server上的一個instance隻能有一個client消費,clientId的設計是為1個instance多client消費模式而預留的,暫時不需要理會)

  • CanalConnector

    SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實作,simple針對的是簡單的ip直連模式,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制

  • CanalNodeAccessStrategy

    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實作,simple針對給定的初始ip清單進行failover選擇,cluster基于zookeeper上的cluster節點動态選擇正在運作的canal server.

  • ClientRunningMonitor/ClientRunningListener/ClientRunningData

    client running相關控制,主要為解決client自身的failover機制。canal client允許同時啟動多個canal client,通過running機制,可保證隻有一個client在工作,其他client做為冷備. 當運作中的client挂了,running會控制讓冷備中的client轉為工作模式,這樣就可以確定canal client也不會是單點. 保證整個系統的高可用性.

javadoc檢視:

  • CanalConnector :http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/CanalConnector.html

Canal server/client互動協定

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

具體的網絡協定格式,可參見:CanalProtocol.proto

Canal get/ack/rollback協定

get/ack/rollback協定介紹:

  • Message getWithoutAck(int batchSize),允許指定batchSize,一次可以擷取多條,每次傳回的對象為Message,包含的内容為:

    a. batch id 唯一辨別

    b. entries 具體的資料對象,可參見下面的資料介紹

  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允許設定擷取資料的timeout逾時時間

    a. 拿夠batchSize條記錄或者超過timeout時間

    b. timeout=0,阻塞等到足夠的batchSize

  • void rollback(long batchId),顧命思議,復原上次的get請求,重新擷取資料。基于get擷取的batchId進行送出,避免誤操作
  • void ack(long batchId),顧命思議,确認已經消費成功,通知server删除資料。基于get擷取的batchId進行送出,避免誤操作

canal的get/ack/rollback協定和正常的jms協定有所不同,允許get/ack異步處理,比如可以連續調用get多次,後續異步按順序送出ack/rollback,項目中稱之為流式api.

流式api設計的好處:

  • get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀态都是處于正常狀态,異常的rollback屬于個别情況,沒必要為個别的case犧牲整個性能)
  • get擷取資料後,業務消費存在瓶頸或者需要多程序/多線程消費時,可以不停的輪詢get資料,不停的往後發送任務,提高并行化. (作者在實際業務中的一個case:業務資料消費需要跨中美網絡,是以一次操作基本在200ms以上,為了減少延遲,是以需要實施并行化)
canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南
  • 每次get操作都會在meta中産生一個mark,mark标記會遞增,保證運作過程中mark的唯一性
  • 每次的get操作,都會在上一次的mark操作記錄的cursor繼續往後取,如果mark不存在,則在last ack cursor繼續往後取
  • 進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會删除目前的mark标記,并将對應的mark位置更新為last ack cursor
  • 一旦出現異常情況,用戶端可發起rollback情況,重新置位:删除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往後取

流式api帶來的異步響應模型:

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

資料對象格式簡單介紹:EntryProtocol.proto

Entry  
    Header  
        logfileName [binlog檔案名]  
        logfileOffset [binlog position]  
        executeTime [binlog裡記錄變更發生的時間戳,精确到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete類型]  
    entryType   [事務頭BEGIN/事務尾END/資料ROWDATA]  
    storeValue  [byte資料,可展開,對應的類型為RowChange]  
RowChange

isDdl       [是否是ddl變更操作,比如create table/drop table]

sql         [具體的ddl sql]

rowDatas    [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]

beforeColumns [Column類型的數組,變更前的資料字段]

afterColumns [Column類型的數組,變更後的資料字段]


Column

index

sqlType     [jdbc type]

name        [column name]

isKey       [是否為主鍵]

updated     [是否發生過變更]

isNull      [值是否為null]

value       [具體的内容,注意為string文本]
           

說明:

  • 可以提供資料庫變更前和變更後的字段内容,針對binlog中沒有的name,isKey等資訊進行補全
  • 可以提供ddl的變更語句
  • insert隻有after columns, delete隻有before columns,而update則會有before / after columns資料.

快速開始代碼剖析

1. 建立Connector

a. 建立SimpleCanalConnector (直連ip,不支援server/client的failover機制)

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
           

b. 建立ClusterCanalConnector (基于zookeeper擷取canal server ip,支援server/client的failover機制)

CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");
           

c. 建立ClusterCanalConnector (基于固定canal server的位址,支援固定的server ip的failover機制,不支援client的failover機制

CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");
           

2. get/ack/rollback使用

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

3. RowData資料處理

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

第7章 Canal 擴充卡

基本說明

canal 1.1.1版本之後,提供了擴充卡功能,可将canal server的資料直接輸出到目的地,不需要使用者編寫用戶端。

溫馨提示:特殊功能需求,還需要使用者編寫用戶端實作

擴充卡整體結構

client-adapter分為擴充卡和啟動器兩部分,每個擴充卡會将自己所需的依賴打成一個包, 以SPI的方式讓啟動器動态加載。

啟動器為 SpringBoot 項目, 支援canal-client啟動的同時提供相關REST管理接口, 運作目錄結構為:

- bin
    restart.sh
    startup.bat
    startup.sh
    stop.sh
- lib
   ...
- plugin 
    client-adapter.logger-1.1.1-jar-with-dependencies.jar
    client-adapter.hbase-1.1.1-jar-with-dependencies.jar
    ...
- conf
    application.yml
    - hbase
        mytest_person2.yml
- logs
           

以上目錄結構最終會打包成 canal-adapter-*.tar.gz 壓縮包

源碼結構解析

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南

launcher:啟動器

logger:日志擴充卡

rdb:支援jdbc的關系型資料庫擴充卡(mysql、oracle、postgress、sqlserver等)

hbase:hbase擴充卡

kudu:kudu擴充卡

Canal擴充卡之啟動

Canal 擴充卡啟動之配置檔案application.yml

canal 使用詳解第1章 Canal 簡介第2章 Canal 快速開始第3章 Docker 安裝Canal 第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步第6章 Canal API第7章 Canal 擴充卡第8章 Canal 管理平台搭建第9章 Canal 管理平台操作指南
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ   #用戶端消費模式,對應下面的consumerProperties
  flatMessage: true                        #是否以json字元串傳遞資料,僅對mq生效
  zookeeperHosts:                          #canal server叢集部署時,建立curator用戶端
                                           #tcp mode需要在consumerProperties tcp中設定
  syncBatchSize: 1000                      #每次同步的批數量
  retries: -1                              #重試次數,-1為無限次
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer                   #canal adapter連接配接的canal server
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer                      #canal adapter連接配接的kafka
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer                  #canal adapter連接配接的rocketmq
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer                     #canal adapter連接配接的rabbitmq
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

#  srcDataSources:
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
                      # 注意:instance name、topic name不支援通配符比對
    groups:
    - groupId: g1      #一份資料可被多個groupId消費
                       #不同groupId并發執行,
                       #同一groupId内的adapters順序執行
      outerAdapters:
      - name: logger   #輸出到日志
#      - name: rdb     #輸出到rdb(關系型資料庫
#        key: mysql1   #輸出到mysql資料庫
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1      #輸出到oracle資料庫
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1     #輸出到postgress資料庫
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase          #輸出到hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es            #輸出到es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#      - name: kudu       #輸出到kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix     #輸出到phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:
           

Canal擴充卡執行個體

業務需求:商品表新增商品資料實時同步ES搜尋引擎。

解決方案:基于Canal監聽MySQL-binlog 日志資訊變化,通過Canal-Adapter 讀取Canal 資料變更記錄,同時寫入ES搜尋引擎。

1、軟體版本

MySQL:8.x
canal:1.1.6
adapter:1.1.6
elasticsearch:7.4.2
           

2、MySQL開啟binlog

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
           
# 是否開啟binlog
show variables like 'log_bin';
# 結果
log_bin    ON

# binlog模式
show variables like 'binlog_format';
# 結果
binlog_format    ROW
           
# 建立使用者canal及密碼設定
CREATE USER canal IDENTIFIED BY 'canal';  
# 賦權
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 重新整理生效
FLUSH PRIVILEGES;
           

3、下載下傳canal及adapter

# canal-server
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
# canal-adapter
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz
           

4、編輯canal配置檔案

vi conf/example/instance.properties
           

此處隻展示修改的配置

# 僞裝成從庫的slaveId,不能與MySQL重複
canal.instance.mysql.slaveId=1234
# 資料庫的ip:端口
canal.instance.master.address=127.0.0.1:3306
# 資料庫使用者名密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
           

5、啟動canal

cd bin
sh startup.sh
           

如遇到如下報錯

OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.

The stack size specified is too small, Specify at least 384k
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
           

解決辦法:調整startup.sh腳本的-Xss參數

vi bin/startup.sh

# 我這裡調整到-Xss512k

if [ -n "$str" ]; then

JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss512k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

else

JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "

6、編輯adapter配置檔案

cd conf
vi application.yml
           

此處隻展示修改的配置

canal.conf:
  consumerProperties:
      # 單機配置屬性
      # canal.tcp.server.host: 127.0.0.1:11111
  # 此配置資料庫資訊與canal-server配置的資料庫資訊相同
  srcDataSources:
      defaultDs:
        url: jbdc:mysql://127.0.0.1:3306/canal_test
        username: canal
        password: canal
  # 配置 ES資訊
  canalAdapters:
    groups:
      outerAdapters: 
          - name: logger
          - name: es7
            hosts: http://127.0.0.1:9200
            properties:
                mode: rest
                security.auth: es賬号:es密碼
                cluster.name: es的名字
           

7、編輯es7 索引配置檔案

cd conf/es7/
cp mytest_user.yml canal_test_order.yml
rm biz_order.yml customer.yml mytest_user.yml
vi canal_test_order.yml


dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: canal_test_order
  _id: _id
# 這個必須要加,源檔案沒有
  _type: _doc
  upsert: true
#  pk: id
  sql: "select
        a.id as _id,
        a.order_no as orderNo,
        a.order_name as orderName
        from t_order a"
#  objFields:
#    _labels: array:;
  etlCondition: "where a.c_time>={}"
  commitBatch: 3000
           

8、啟動Canal adapter

cd bin
sh startup.sh
           

檢視Adapter 日志記錄出現如下錯誤時:

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
    at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_322]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_322]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_322]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_322]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
           

解決辦法,更改源碼

下載下傳canal-adapter源碼

修改client-adapter/escore/pom.xml為

原
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>

改成
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <scope>provided</scope>
        </dependency>
           

重新打包編譯生成client-adapter.es7x-1.1.5-jar-with-dependencies.jar

放入canal-adapter的plugin目錄下,替換原jar

重新啟動,Canal Adapter 日志如下

9、測試

在MySQL手動插入一條資料

adapter.log列印日志如下

2023-02-09 15:20:25.519 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"}
2023-02-09 15:20:25.520 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"} 
Affected indexes: canal_test_order 
           

檢視elasticsearch資料

# get 127.0.0.1:9200/canal_test_order/_search

{
    "took": 0,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "canal_test_order",
                "_type": "_doc",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "orderNo": 111111,
                    "orderName": "11111"
                }
            }
        ]
    }
}
           

第8章 Canal 管理平台搭建

背景

canal-admin設計上是為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作界面,友善更多使用者快速和安全的操作。

準備

canal-admin的限定依賴:

  1. MySQL,用于存儲配置和節點等相關資料
  1. canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動态運維管理接口)

部署

  1. 下載下傳 canal-admin, 通路 release 頁面 , 選擇需要的包下載下傳, 如以 1.1.6 版本為例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.admin-1.1.6.tar.gz
           
  1. 解壓縮
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin
           

解壓完成後,進入 /tmp/canal 目錄,可以看到如下結構

drwxr-xr-x   6 agapple  staff   204B  8 31 15:37 bin
drwxr-xr-x   8 agapple  staff   272B  8 31 15:37 conf
drwxr-xr-x  90 agapple  staff   3.0K  8 31 15:37 lib
drwxr-xr-x   2 agapple  staff    68B  8 31 15:26 logs
           
  1. 配置修改
vi conf/application.yml
           
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
           
  1. 初始化中繼資料庫
mysql -h127.1 -uroot -p

# 導入初始化SQL
> source conf/canal_manager.sql
           

a. 初始化SQL腳本裡會預設建立canal_manager的資料庫,建議使用root等有超級權限的賬号進行初始化 b. canal_manager.sql預設會在conf目錄下。

  1. 啟動
sh bin/startup.sh
           
  1. 日志檢視
vi logs/admin.log

2023-02-09 15:43:38.162 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2023-02-09 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2023-02-09 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
2023-02-09 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2023-02-09 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2023-02-09 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]
           

此時代表canal-admin已經啟動成功,可以通過 http://127.0.0.1:8089/ 通路,預設密碼:admin/123456

  1. 關閉
sh bin/stop.sh
           
  1. canal-server端配置

使用canal_local.properties的配置覆寫canal.properties

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
           

啟動admin-server即可。

第9章 Canal 管理平台操作指南

請參考:Canal-Admin-指南