第1章 Canal 簡介
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iNxIWN4YWY2cjMhlTOxcDOiZzY0QzM2cTOmNGZ0ATYj9CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
canal [kə'næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 資料庫增量日志解析,提供增量資料訂閱和消費
工作原理
- canal 模拟 MySQL slave 的互動協定,僞裝自己為 MySQL slave ,向 MySQL master 發送 dump 協定
- MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
- canal 解析 binary log 對象(原始為 byte 流)
第2章 Canal 快速開始
環境準備
主機環境:Windows 11
資料庫版本:MySQL-8資料庫
Canal版本:canal.deployer-1.1.6
MySQL8準備
(1)檢查MySQL 的binlog功能是否有開啟
-- 是否開啟binlog
show VARIABLES like 'log_bin';
(2)如果顯示狀态為OFF表示該功能未開啟,開啟binlog功能
1,修改 mysql 的配置檔案 my.cnf
**/**/my.cnf
末尾追加内容:
#binlog檔案名
log-bin=mysql-bin
#選擇row模式
binlog_format=ROW
#mysql執行個體id,不能和canal的slaveId重複
server_id=1
2,windows 重新開機 mysql
(3)在mysql裡面添加以下的相關使用者和權限
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO
'canal'@'%';
FLUSH PRIVILEGES;
Windows 安裝Canal
下載下傳位址
Canal Git 位址:https://github.com/alibaba/canal/releases
解壓及配置
解壓canal.deployer-1.1.6.tar.gz,我們可以看到裡面有四個檔案夾:
Canal 啟動配置
(1)打開配置檔案conf/example/instance.properties
#################################################
## v1.0.26版本後會自動生成slaveId,是以可以不用配置
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# 資料庫位址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名稱
canal.instance.master.journal.name=mysql-bin.000001
# binlog偏移量
canal.instance.master.position=913
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
知識點拓展:檢視MySQL 的binlog日志名稱 和binlog 偏移量
# 檢視目前伺服器使用的biglog檔案及大小
show binary logs;
# 檢視最新一個binlog日志檔案名稱和Position
show master status;
# 檢視 binlog 日志清單
show master logs;
(2)Canal 啟動
切換至Canal項目bin 檔案夾(D:\Canal\canal.deployer-1.1.6\bin),輕按兩下啟動startup.bat
(3)檢視Canal server 日志
切換至Canal項目logs/canal 檔案夾
檢視logs/canal/canal.log 日志内容
2023-02-06 15:45:55.188 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-06 15:45:55.193 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-06 15:45:55.198 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-06 15:45:55.358 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.43.80(192.168.43.80):11111]
2023-02-06 15:45:56.260 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
(4)檢視Canal instance 日志
切換至Canal項目logs/example檔案夾
2023-02-06 17:06:18.146 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2023-02-06 17:06:18.148 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-02-06 17:06:18.148 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-02-06 17:06:18.148 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-06 17:06:18.201 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-06 17:06:18.316 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000001:4:1675666212000
2023-02-06 17:06:18.809 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1675666212000] cost : 608ms , the next step is binlog dump
2023-02-06 17:06:33.829 [MultiStageCoprocessor-other-example-0] WARN com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157
2023-02-06 17:06:48.835 [MultiStageCoprocessor-other-example-0] WARN com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157
(5)Canal 停止
直接關閉Canal 服務運作視窗即可。
第3章 Docker 安裝Canal
第一步:檢視本地鏡像、檢查Canal鏡像和下載下傳Canal 鏡像
# 檢視本地鏡像
docker images
# 檢索Kafka鏡像
docker search canal
# 下載下傳Kafka 鏡像指定版本
docker pull canal/canal-server:latest
第二步:docker 啟動Canal
docker run --name canal -d canal/canal-server
知識點拓展:拷貝Canal 容器内部配置檔案拷貝到外部
文法:docker cp [容器索引]:[内部路徑] [外部路徑]
執行個體:
docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canal
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal
第三步:修改Canal 配置檔案instance.properties
# 編輯配置檔案
vi /home/canal/instance.properties
編輯内容:
#################################################
## v1.0.26版本後會自動生成slaveId,是以可以不用配置
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# 資料庫位址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名稱
canal.instance.master.journal.name=mysql-bin.000001
# binlog偏移量
canal.instance.master.position=913
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
第四步:基于Canal 外部配置檔案,重新Canal 容器執行個體
docker run --name canal -p 11111:11111 -d -v /home/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server
可選指令:
- 關閉Canal 容器
docker stop canal
- 移除Canal 容器
docker rm canal
第4章 基于Canal 和Kafka,實作MySQL的Binlog 近實時同步
搭建一套可以用的元件需要部署MySQL、Zookeeper、Kafka和Canal四個中間件的執行個體。
Docker 安裝MySQL
請參考:Docker 安裝MySQL
CentOS-7安裝ZooKeeper
Canal和Kafka叢集都依賴于Zookeeper做服務協調,為了友善管理,一般會獨立部署Zookeeper服務或者Zookeeper叢集。
midkr /data/zk
# 建立資料目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg
把zoo.cfg檔案中的dataDir設定為/data/zk/data,然後啟動Zookeeper:
[[email protected] conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
注意一點,要啟動此版本的Zookeeper服務必須本地安裝好JDK8+。啟動的預設端口是2181,啟動成功後的日志如下:
CentOS-7 安裝Kafka
Kafka是一個高性能分布式消息隊列中間件,它的部署依賴于Zookeeper。筆者在此選用2.4.0并且Scala版本為2.13的安裝包:
mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz
解壓後/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對應的zookeeper.connect=localhost:2181已經符合需要,不必修改,需要修改日志檔案的存放目錄log.dirs為/data/kafka/data。然後啟動Kafka服務:
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties
知識拓展:kafka 背景進行運作設定
Kafka啟動後一旦退出控制台就會結束Kafka程序,可以添加-daemon參數用于控制Kafka程序背景不挂斷運作。
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties
CentOS-7 安裝Canal
CentOS 安裝Canal 核心步驟
mkdir /data/canal
cd /data/canal
# 這裡注意一點,Github在國内被牆,下載下傳速度極慢,可以先用其他下載下傳工具下載下傳完再上傳到伺服器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz
Canal 解壓後目錄說明:
- bin # 運維腳本
- conf # 配置檔案
canal_local.properties # canal本地配置,一般不需要動
canal.properties # canal服務配置
logback.xml # logback日志配置
metrics # 度量統計配置
spring # spring-執行個體配置,主要和binlog位置計算、一些政策配置相關,可以在canal.properties選用其中的任意一個配置檔案
example # 執行個體配置檔案夾,一般認為單個資料庫對應一個獨立的執行個體配置檔案夾
instance.properties # 執行個體配置,一般指單個資料庫的配置
- lib # 服務依賴包
- logs # 日志檔案輸出目錄
在開發和測試環境建議把logback.xml的日志級别修改為DEBUG友善定位問題。這裡需要關注canal.properties和instance.properties兩個配置檔案。canal.properties檔案中,需要修改:
- 去掉canal.instance.parser.parallelThreadSize = 16這個配置項的注釋,也就是啟用此配置項,和執行個體解析器的線程數相關,不配置會表現為阻塞或者不進行解析。
- canal.serverMode配置項指定為kafka,可選值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以選用rabbitmq),預設是kafka。
- canal.mq.servers配置需要指定為Kafka服務或者叢集Broker的位址,這裡配置為127.0.0.1:9092
canal.mq.servers在不同的canal.serverMode有不同的意義。
kafka模式下,指Kafka服務或者叢集Broker的位址,也就是bootstrap.servers
rocketmq模式下,指NameServer清單
rabbitmq模式下,指RabbitMQ服務的Host和Port
本文Kafka執行個體配置:
找到canal.deployer-1.1.6/conf目錄下的canal.properties配置檔案:
# tcp, kafka, RocketMQ 這裡選擇kafka模式
canal.serverMode = kafka
# 解析器的線程數,打開此配置,不打開則會出現阻塞或者不進行解析的情況
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服務位址,這裡配置的是kafka對應的位址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目錄下要有example同名的目錄,可以配置多個
canal.destinations = example
其他配置項可以參考下面兩個官方Wiki的連結:
- Canal-Kafka-RocketMQ-QuickStart
- AdminGuide
instance.properties一般指一個資料庫執行個體的配置,Canal架構支援一個Canal服務執行個體,處理多個資料庫執行個體的binlog異步解析。instance.properties需要修改的配置項主要包括:
- canal.instance.mysql.slaveId需要配置一個和Master節點的服務ID完全不同的值,這裡筆者配置為654321。
- 配置資料源執行個體,包括位址、使用者、密碼和目标資料庫:
- canal.instance.master.address,這裡指定為127.0.0.1:3306。
- canal.instance.dbUsername,這裡指定為canal。
- canal.instance.dbPassword,這裡指定為QWqw12!@。
- 新增canal.instance.defaultDatabaseName,這裡指定為test(需要在MySQL中建立一個test資料庫,見前面的流程)。
- Kafka相關配置,這裡暫時使用靜态topic和單個partition:
- canal.mq.topic,這裡指定為test,也就是解析完的binlog結構化資料會發送到Kafka的命名為test的topic中。
- canal.mq.partition,這裡指定為0。
本文MySQL8執行個體配置:
配置instance,找到/conf/example/instance.properties配置檔案:
## mysql serverId , v1.0.26+ will autoGen(自動生成,不需配置)
# canal.instance.mysql.slaveId=0
# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql執行 SHOW MASTER STATUS;檢視目前資料庫的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 賬号密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ隊列名稱
canal.mq.topic=canaltopic
#單隊列模式的分區下标
canal.mq.partition=0
配置工作做好之後,可以啟動Canal服務:
sh /data/canal/bin/startup.sh
# 檢視服務日志
tail -100f /data/canal/logs/canal/canal
# 檢視執行個體日志 -- 一般情況下,關注執行個體日志即可
tail -100f /data/canal/logs/example/example.log
啟動正常後,見執行個體日志如下:
資料示範
在test資料庫建立一個訂單表
use `test`;
CREATE TABLE `order`
(
id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
order_id VARCHAR(64) NOT NULL COMMENT '訂單ID',
amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';
INSERT INTO `order`(order_id, amount) VALUES ('20230207093012', 1999);
UPDATE `order` SET amount = 2000 WHERE order_id = '20230207093012';
DELETE FROM `order` WHERE order_id = '20230207093012';
利用Kafka的kafka-console-consumer或者Kafka Tools檢視test這個topic的資料:
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
第5章 基于Canal 和RabbitMQ,實作MySQL的Binlog 近實時同步
Docker 安裝MySQL
請參考:Docker 安裝MySQL
Docker 安裝RabbitMQ
請參考:Docker 安裝RabbitMQ
RabbitMQ 增加交換機和隊列
- 添加交換機 canal_exchange
- 添加隊列 canal_queue
- 隊列綁定交換機
CentOS-7 安裝Canal
centos-7 解壓安裝Canal與上 一章節一緻,顧不在做較長的描述***。本章節重點講解Canal 配置RabbitMQ 參數配置。
Canal Server配置
需要配置的東西就兩項,一個是監聽資料庫配置,另一個是 RabbitMQ 連接配接配置。
instance.properties
監聽資料庫配置
cd /example 目錄下
canal.properties
配置 Canal 服務方式為 RabbitMQ 和連接配接配置
進入到conf檔案,打開canal.properties
serverMode(服務模式)修改為rabbitMQ,預設TCP.
RabbitMQ 服務相關參數設定。
第6章 Canal API
快速開始
第一步:maven 添加相關jar包依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
第二步:編寫main方法測試
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 建立連結
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 擷取指定數量的資料
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 送出确認
// connector.rollback(batchId); // 處理失敗, 復原資料
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
第三步:啟動Canal 服務
第四步: 運作Canal 用戶端的main 方法,控制台輸出如下資訊:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
含義:資料庫無變更記錄。
第五步:模拟資料庫變更操作
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
-> `ID` int(11) NOT NULL AUTO_INCREMENT,
-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-> PRIMARY KEY (`ID`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
再次檢視Canal 用戶端,控制台輸出資訊:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true
Canal API 文檔說明
Canal 類設計
大緻分為幾部分:
-
ClientIdentity
canal client和server互動之間的身份辨別,目前clientId寫死為1001. (目前canal server上的一個instance隻能有一個client消費,clientId的設計是為1個instance多client消費模式而預留的,暫時不需要理會)
-
CanalConnector
SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實作,simple針對的是簡單的ip直連模式,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制
-
CanalNodeAccessStrategy
SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實作,simple針對給定的初始ip清單進行failover選擇,cluster基于zookeeper上的cluster節點動态選擇正在運作的canal server.
-
ClientRunningMonitor/ClientRunningListener/ClientRunningData
client running相關控制,主要為解決client自身的failover機制。canal client允許同時啟動多個canal client,通過running機制,可保證隻有一個client在工作,其他client做為冷備. 當運作中的client挂了,running會控制讓冷備中的client轉為工作模式,這樣就可以確定canal client也不會是單點. 保證整個系統的高可用性.
javadoc檢視:
- CanalConnector :http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/CanalConnector.html
Canal server/client互動協定
具體的網絡協定格式,可參見:CanalProtocol.proto
Canal get/ack/rollback協定
get/ack/rollback協定介紹:
-
Message getWithoutAck(int batchSize),允許指定batchSize,一次可以擷取多條,每次傳回的對象為Message,包含的内容為:
a. batch id 唯一辨別
b. entries 具體的資料對象,可參見下面的資料介紹
-
getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允許設定擷取資料的timeout逾時時間
a. 拿夠batchSize條記錄或者超過timeout時間
b. timeout=0,阻塞等到足夠的batchSize
- void rollback(long batchId),顧命思議,復原上次的get請求,重新擷取資料。基于get擷取的batchId進行送出,避免誤操作
- void ack(long batchId),顧命思議,确認已經消費成功,通知server删除資料。基于get擷取的batchId進行送出,避免誤操作
canal的get/ack/rollback協定和正常的jms協定有所不同,允許get/ack異步處理,比如可以連續調用get多次,後續異步按順序送出ack/rollback,項目中稱之為流式api.
流式api設計的好處:
- get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀态都是處于正常狀态,異常的rollback屬于個别情況,沒必要為個别的case犧牲整個性能)
- get擷取資料後,業務消費存在瓶頸或者需要多程序/多線程消費時,可以不停的輪詢get資料,不停的往後發送任務,提高并行化. (作者在實際業務中的一個case:業務資料消費需要跨中美網絡,是以一次操作基本在200ms以上,為了減少延遲,是以需要實施并行化)
- 每次get操作都會在meta中産生一個mark,mark标記會遞增,保證運作過程中mark的唯一性
- 每次的get操作,都會在上一次的mark操作記錄的cursor繼續往後取,如果mark不存在,則在last ack cursor繼續往後取
- 進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會删除目前的mark标記,并将對應的mark位置更新為last ack cursor
- 一旦出現異常情況,用戶端可發起rollback情況,重新置位:删除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往後取
流式api帶來的異步響應模型:
資料對象格式簡單介紹:EntryProtocol.proto
Entry
Header
logfileName [binlog檔案名]
logfileOffset [binlog position]
executeTime [binlog裡記錄變更發生的時間戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete類型]
entryType [事務頭BEGIN/事務尾END/資料ROWDATA]
storeValue [byte資料,可展開,對應的類型為RowChange]
RowChange
isDdl [是否是ddl變更操作,比如create table/drop table]
sql [具體的ddl sql]
rowDatas [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]
beforeColumns [Column類型的數組,變更前的資料字段]
afterColumns [Column類型的數組,變更後的資料字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否為主鍵]
updated [是否發生過變更]
isNull [值是否為null]
value [具體的内容,注意為string文本]
說明:
- 可以提供資料庫變更前和變更後的字段内容,針對binlog中沒有的name,isKey等資訊進行補全
- 可以提供ddl的變更語句
- insert隻有after columns, delete隻有before columns,而update則會有before / after columns資料.
快速開始代碼剖析
1. 建立Connector
a. 建立SimpleCanalConnector (直連ip,不支援server/client的failover機制)
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
b. 建立ClusterCanalConnector (基于zookeeper擷取canal server ip,支援server/client的failover機制)
CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");
c. 建立ClusterCanalConnector (基于固定canal server的位址,支援固定的server ip的failover機制,不支援client的failover機制
CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");
2. get/ack/rollback使用
3. RowData資料處理
第7章 Canal 擴充卡
基本說明
canal 1.1.1版本之後,提供了擴充卡功能,可将canal server的資料直接輸出到目的地,不需要使用者編寫用戶端。
溫馨提示:特殊功能需求,還需要使用者編寫用戶端實作
擴充卡整體結構
client-adapter分為擴充卡和啟動器兩部分,每個擴充卡會将自己所需的依賴打成一個包, 以SPI的方式讓啟動器動态加載。
啟動器為 SpringBoot 項目, 支援canal-client啟動的同時提供相關REST管理接口, 運作目錄結構為:
- bin
restart.sh
startup.bat
startup.sh
stop.sh
- lib
...
- plugin
client-adapter.logger-1.1.1-jar-with-dependencies.jar
client-adapter.hbase-1.1.1-jar-with-dependencies.jar
...
- conf
application.yml
- hbase
mytest_person2.yml
- logs
以上目錄結構最終會打包成 canal-adapter-*.tar.gz 壓縮包
源碼結構解析
launcher:啟動器
logger:日志擴充卡
rdb:支援jdbc的關系型資料庫擴充卡(mysql、oracle、postgress、sqlserver等)
hbase:hbase擴充卡
kudu:kudu擴充卡
Canal擴充卡之啟動
Canal 擴充卡啟動之配置檔案application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ #用戶端消費模式,對應下面的consumerProperties
flatMessage: true #是否以json字元串傳遞資料,僅對mq生效
zookeeperHosts: #canal server叢集部署時,建立curator用戶端
#tcp mode需要在consumerProperties tcp中設定
syncBatchSize: 1000 #每次同步的批數量
retries: -1 #重試次數,-1為無限次
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer #canal adapter連接配接的canal server
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer #canal adapter連接配接的kafka
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer #canal adapter連接配接的rocketmq
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer #canal adapter連接配接的rabbitmq
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
# srcDataSources:
# defaultDS:
# url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
# username: root
# password: 121212
canalAdapters:
- instance: example # canal instance Name or mq topic name
# 注意:instance name、topic name不支援通配符比對
groups:
- groupId: g1 #一份資料可被多個groupId消費
#不同groupId并發執行,
#同一groupId内的adapters順序執行
outerAdapters:
- name: logger #輸出到日志
# - name: rdb #輸出到rdb(關系型資料庫
# key: mysql1 #輸出到mysql資料庫
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# druid.stat.enable: false
# druid.stat.slowSqlMillis: 1000
# - name: rdb
# key: oracle1 #輸出到oracle資料庫
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1 #輸出到postgress資料庫
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase #輸出到hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es #輸出到es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu #輸出到kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
# - name: phoenix #輸出到phoenix
# key: phoenix
# properties:
# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
# jdbc.username:
# jdbc.password:
Canal擴充卡執行個體
業務需求:商品表新增商品資料實時同步ES搜尋引擎。
解決方案:基于Canal監聽MySQL-binlog 日志資訊變化,通過Canal-Adapter 讀取Canal 資料變更記錄,同時寫入ES搜尋引擎。
1、軟體版本
MySQL:8.x
canal:1.1.6
adapter:1.1.6
elasticsearch:7.4.2
2、MySQL開啟binlog
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
# 是否開啟binlog
show variables like 'log_bin';
# 結果
log_bin ON
# binlog模式
show variables like 'binlog_format';
# 結果
binlog_format ROW
# 建立使用者canal及密碼設定
CREATE USER canal IDENTIFIED BY 'canal';
# 賦權
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 重新整理生效
FLUSH PRIVILEGES;
3、下載下傳canal及adapter
# canal-server
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
# canal-adapter
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz
4、編輯canal配置檔案
vi conf/example/instance.properties
此處隻展示修改的配置
# 僞裝成從庫的slaveId,不能與MySQL重複
canal.instance.mysql.slaveId=1234
# 資料庫的ip:端口
canal.instance.master.address=127.0.0.1:3306
# 資料庫使用者名密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
5、啟動canal
cd bin
sh startup.sh
如遇到如下報錯
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The stack size specified is too small, Specify at least 384k
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
解決辦法:調整startup.sh腳本的-Xss參數
vi bin/startup.sh
# 我這裡調整到-Xss512k
if [ -n "$str" ]; then
JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss512k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
6、編輯adapter配置檔案
cd conf
vi application.yml
此處隻展示修改的配置
canal.conf:
consumerProperties:
# 單機配置屬性
# canal.tcp.server.host: 127.0.0.1:11111
# 此配置資料庫資訊與canal-server配置的資料庫資訊相同
srcDataSources:
defaultDs:
url: jbdc:mysql://127.0.0.1:3306/canal_test
username: canal
password: canal
# 配置 ES資訊
canalAdapters:
groups:
outerAdapters:
- name: logger
- name: es7
hosts: http://127.0.0.1:9200
properties:
mode: rest
security.auth: es賬号:es密碼
cluster.name: es的名字
7、編輯es7 索引配置檔案
cd conf/es7/
cp mytest_user.yml canal_test_order.yml
rm biz_order.yml customer.yml mytest_user.yml
vi canal_test_order.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: canal_test_order
_id: _id
# 這個必須要加,源檔案沒有
_type: _doc
upsert: true
# pk: id
sql: "select
a.id as _id,
a.order_no as orderNo,
a.order_name as orderName
from t_order a"
# objFields:
# _labels: array:;
etlCondition: "where a.c_time>={}"
commitBatch: 3000
8、啟動Canal adapter
cd bin
sh startup.sh
檢視Adapter 日志記錄出現如下錯誤時:
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_322]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_322]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_322]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_322]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
解決辦法,更改源碼
下載下傳canal-adapter源碼
修改client-adapter/escore/pom.xml為
原
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
改成
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>
</dependency>
重新打包編譯生成client-adapter.es7x-1.1.5-jar-with-dependencies.jar
放入canal-adapter的plugin目錄下,替換原jar
重新啟動,Canal Adapter 日志如下
9、測試
在MySQL手動插入一條資料
adapter.log列印日志如下
2023-02-09 15:20:25.519 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"}
2023-02-09 15:20:25.520 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"}
Affected indexes: canal_test_order
檢視elasticsearch資料
# get 127.0.0.1:9200/canal_test_order/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "canal_test_order",
"_type": "_doc",
"_id": "1",
"_score": 1.0,
"_source": {
"orderNo": 111111,
"orderName": "11111"
}
}
]
}
}
第8章 Canal 管理平台搭建
背景
canal-admin設計上是為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作界面,友善更多使用者快速和安全的操作。
準備
canal-admin的限定依賴:
- MySQL,用于存儲配置和節點等相關資料
- canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動态運維管理接口)
部署
- 下載下傳 canal-admin, 通路 release 頁面 , 選擇需要的包下載下傳, 如以 1.1.6 版本為例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.admin-1.1.6.tar.gz
- 解壓縮
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
解壓完成後,進入 /tmp/canal 目錄,可以看到如下結構
drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin
drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf
drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib
drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs
- 配置修改
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- 初始化中繼資料庫
mysql -h127.1 -uroot -p
# 導入初始化SQL
> source conf/canal_manager.sql
a. 初始化SQL腳本裡會預設建立canal_manager的資料庫,建議使用root等有超級權限的賬号進行初始化 b. canal_manager.sql預設會在conf目錄下。
- 啟動
sh bin/startup.sh
- 日志檢視
vi logs/admin.log
2023-02-09 15:43:38.162 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2023-02-09 15:43:38.180 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2023-02-09 15:43:38.191 [main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
2023-02-09 15:43:38.194 [main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2023-02-09 15:43:39.789 [main] INFO o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2023-02-09 15:43:39.825 [main] INFO o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]
此時代表canal-admin已經啟動成功,可以通過 http://127.0.0.1:8089/ 通路,預設密碼:admin/123456
- 關閉
sh bin/stop.sh
- canal-server端配置
使用canal_local.properties的配置覆寫canal.properties
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
啟動admin-server即可。
第9章 Canal 管理平台操作指南
請參考:Canal-Admin-指南