▌前言:
canal [kə'næl],譯意為水道/管道/溝渠.
canal 主要用途是基于 MySQL 資料庫增量日志解析,提供增量資料 訂閱 和 消費。
canal 應該是阿裡雲DTS(Data Transfer Service)的開源版本,開源位址:https://github.com/alibaba/canal。
僞裝的mysqlslave, dump協定,接收 bin log 日志資料
Canal和 mysql 用戶端的回放線程不一樣,Canal 對 binlog 進行轉發,可以 socket,也可以發送rocketmq,等等多種方式
比如,把canal.serverMode選項修改為rocketMQ類型:
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
注:本文以 PDF 持續更新,最新尼恩 架構筆記、面試題 的PDF檔案,請到《技術自由圈》公衆号領取
▌Canal 使用場景:
先看看,Canal使用場景:
- 場景1 高并發系統的三大守護神
- 場景2 100W qps 三級緩存元件
- 其他場景
▌Canal使用場景1: 高并發系統的三大守護神
在很多高并發系統的三大,我們都會在系統中加入 三大守護神 :
- redis高速緩存
- es 做全文檢索
- hbase /mongdb做海量存儲
如果資料庫資料發生更新,這時候就需要在業務代碼中寫一段同步更新 三大守護神 的代碼。
這種資料同步的代碼跟業務代碼糅合在一起會不太優雅,能不能把這些資料同步的代碼抽出來形成一個獨立的子產品呢,答案是可以的。
canal即可作為MySQL binlog增量訂閱消費元件+MQ消息隊列将增量資料更新到:
- redis高速緩存
- es 做全文檢索
- hbase /mongdb做海量存儲
當然是可以的,而且架構上也非常漂亮:
圖中的 redis 緩存操作服務、 es 索引操作服務、 hbase 海量存儲操作服務, 都扮演了 bin log 擴充卡 adapter 的角色。
▌Canal使用場景2: 100W qps 三級緩存元件
在100W qps 三級緩存元件 的架構中,也需要通過 Canal 進行 binlog 的 訂閱, 進行無入侵的 緩存資料維護
圖中的 redis 緩存操作服務、 或者 caffeine 本地緩存操作服務、或者 nginx share dict 本地緩存操作服務,都扮演者緩存 看門狗 watch dog的角色。
緩存 看門狗 watch dog的角色,類似于上面的 adapter 角色。
緩存 看門狗 watch dog的角色,這裡簡稱為 cacheDog 服務。
▌Canal高可用架構:
使用Cannel,為了保證系統 達到 4個9、甚至 5個9 的高可用性,
Canal 服務不能是單節點的,一定是高可用叢集的形式存在。
為什麼呢?
如果cannel 儲存資料不成功,就會導緻資料庫跟三大高并發守護神 (比如ES、比如redis)資料不一緻。
Canal 單節點用于學習、用于測試是OK的
但是Canal 單節點用于生産,會嚴重影響系統健壯性,穩定性,是以把canal部署成高可用叢集。
Canal部署成高可用叢集的架構如下:
Canal Server HA架構原理:
Canal的ha(雙機叢集)分為兩部分,canal server和canal client分别有對應的ha實作:
Canal server:
為了減少對mysql dump的請求,不同server上的執行個體(instance)要求同一時間隻能有一個處于running,其他的處于standby狀态。
或者說,由于instance 由 Canal server 負責執行,是以 同一個 叢集裡邊的 Canal server, 同一時間隻能有一個處于running,其他的處于standby狀态
Canal client:
為了保證有序性,一份執行個體(instance)同一時間隻能由一個canal client 進行get/ack/rollback等遠端操作,否則用戶端接收無法保證有序。
Zookeeper 負責協調:
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定),
同一個 叢集裡邊的 Canal server, 需要去建立和監聽屬于Server的唯一的 znode節點,成功則running,失敗則 standby
同一個 叢集裡邊的 Canal Client, 需要去建立和監聽屬于Client的唯一的 znode節點,成功則running,失敗則 standby。
standby 的空閑角色,一直監聽 唯一的 znode節點 過期狀态,随時準備去 争搶轉正 機會。
關于Zookeeper 、Znode 、釋出訂閱等這些基礎知識,請大家參閱 《Java 高并發核心程式設計 卷1 加強版》PDF
▌Canal高可用Server的協作流程:
- canal server要啟動某個canal instance時, 都先向zookeeper進行一次嘗試啟動判斷(實作:建立EPHEMERAL節點,誰建立成功就允許誰啟動)。
- 建立zookeeper節點成功後,對應的canal server就啟動對應的canal instance,沒有建立成功的canal instance就會處于standby狀态。
- 一旦zookeeper發現canal server A建立的節點消失後,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance。
- canal client每次進行connect時,會首先向zookeeper詢問目前是誰啟動了canal instance,然後和其建立連結,一旦連結不可用,會重新嘗試connect。
注:canal client的方式和canal server方式類似,也是利用zookeeper的搶占EPHEMERAL節點的方式進行控制。
▌Canal的三大核心角色:
在實操 Canal之前,需要了解一下 Canal的三大核心角色,否則,容易雲裡霧裡,不知所處。
▌角色1: canal server
可以簡單地把canal了解為一個用來同步增量資料的一個工具。
我們看一張官網提供的示意圖:
canal的工作原理就是把自己僞裝成MySQL slave,模拟MySQL slave的互動協定向MySQL Mater發送 dump協定,MySQL mater收到canal發送過來的dump請求,開始推送binary log給canal,然後canal解析binary log,再發送到存儲目的地,比如MySQL,Kafka,Elastic Search等等。
- 因為在 TCP 模式下,一個 instance 隻能有一個 canal client 訂閱,
- 即使同時有多個 canal client 訂閱相同的 instance, 也隻會有一個 canal client 成功擷取 binlog,
- 是以 canal server 寫死 clientId = 1001.
- 也正是因為一個 instance 隻有一個 canal client, 是以 canal server 将 binlog 位點資訊維護在了 instance 級别,即 conf/content/meta.dat 檔案中
- 在 TCP 模式下,如果 canal client 想重新擷取以前的 binlog,隻能通過修改 canal server 的 initial position 配置并重新開機服務來達到目的
-
在 TCP 模式下 canal server 主要提供了兩個功能
(1) 維護 mysql binlog position 資訊,目的是作為 dump 的請求參數,這也是 canal server 唯一儲存的資料
(2) 對用戶端提供接口以查詢 binlog
canal.serverMode 的服務模式有:tcp, kafka, rocketMQ, rabbitMQ
可以把選項修改為rocketMQ類型:
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
這時候,就是 canal server 把收到的 binlog ,按照 instance的過濾要求完成處理後,寫入到 rocketMQ。
canal server 負責 canal instance的啟動。
canal server 啟動過程中的關鍵資訊如下:
- 确定 binlog first position
(1) 先從 conf/content/meta.dat 檔案中查找 last position, 也就是最後一次成功 dump binlog 的位點
(2) 如果不存在 last position, 則從 conf/content/instance.properties 配置檔案中查找 initial position, 這是我們人為配置的初始化位點
(3) 如果不存在 initial position, 則執行 show master status 指令擷取 mysql binlog lastest position
通過以上三步就可以确定 canal server 啟動之後 binlog 初始位點
- 将 first position 指派給 last position 儲存在記憶體中
- 将 schema 緩存到 conf/content/h2.mv.db 檔案中
▌角色2:canal client
canal.serverMode 的服務模式有:tcp, kafka, rocketMQ, rabbitMQ。 預設情況下,是tcp, 就是 開啟一個Netty服務,發送 binlog 到 Client。
canal client 需要自己開發 TCP用戶端,可以參考官方的 canal client 實作。
- canal client 的 java demo 可以去官方 GitHub 上找一下,記得将 destination 等配置資訊改正确。
- 請參考 https://github.com/alibaba/canal/wiki/ClientExample
- canal client connect
- canal client describe
(1) 在收到用戶端訂閱請求之後,logs/content/content.log 檔案會列印出相關日志
(2) conf/content/meta.dat 檔案記錄了用戶端的訂閱資訊,包括 clientId, destination, filter 等
- canal client getWithoutAck
(1) canal server 在收到 canal client 查詢請求之後,以記憶體中的 last position 作為參數向 mysql server 發送 dump 請求
(2) 如果存在比 last position 更新的 binlog, canal server 會收到 mysql server 的傳回資料,然後将其轉換為 Message 資料結構傳回給 canal client
- canal client ack
- canal client 收到 canal server 的資料之後,可以發送 ack确認 last position的同步位置。
- canal server 在收到 canal client 确認請求之後,更新記憶體中的 last position 并同步儲存到 conf/content/meta.dat 檔案中,在 logs/content/meta.log 檔案中列印日志
▌角色3:什麼是 canal instance?
canal server 僅僅是保姆角色,真正完成 解析 binlog日志、 binlog日志過濾、 binlog日志轉儲、位點中繼資料管理等等 核心功能,是由canal instance 角色完成。
Canal Instance 的架構圖如下圖所示:
Canal 中資料的同步是由 CanalInstance 元件負責,
一個 Canal Server 執行個體中可以建立多個 CanalInstance 執行個體。
每一個 CanalInstance 可以看成是對應一個 MySQL 執行個體,即案例中需要同步兩個資料庫執行個體,故最終需要建立兩個 Canal Instance。
其實也不難了解,因為 MySQL 的 binlog 就是以執行個體為次元進行存儲的。
Canal Instance 包含了 4個 核心元件 :EventParse、EventSink、EventStore、CanaMetaManager,
在這裡主要是闡明其作用,以便更好的指導實踐。
▌EventParse 元件
- 負責解析 binlog日志,其職責就是根據 binlog 的存儲格式将有效資料提取出來,
- 這個不難了解,我們也可以通過該子產品,進一步了解一下 binglog 的存儲格式。
▌EventSink 元件
- 在一個資料庫執行個體上通常會建立多個 Schema,但通常并不是所有的 schema 都需要被同步,
- 如果直接将 EventParse 解析出來的資料全部傳入EventStore 元件,将對 EventStore 帶來不必要的性能消耗;
- 另外本例中使用了分庫分表,需要将多個庫的資料同步到單一源,可能需要涉及到合并、歸并等政策。
- 以上等等等需求就是 EventSink 需要解決的問題域。
▌EventStore 元件:
- 用來存儲經 canal 轉換的資料,被 Canal Client 進行消費的資料,
- 目前 Canal 隻提供了基于記憶體的存儲實作。
▌CanalMetaManager 元件
- 中繼資料存儲管理器。
- 在 Canal 中最基本的中繼資料至少應該包含 EventParse 元件解析的位點與消費端的消費位點。
- Canal Server 重新開機後要能從上一次未同步位置開始同步,否則會丢失資料。
▌角色4:什麼是 canal cluster叢集?
多個 cannel server,可以在建立的時候,歸屬到一個 叢集cluster下邊。
一個 叢集cluster下邊,同時隻有一個 cannel server running,其他的standby,實作高可用。
這裡文字有點說不清楚,具體請參見視訊: 第26章 100qps 三級緩存元件 實操
▌角色5:什麼是 canal admin?
主要的作用
- 通過圖形化界面管理配置參數。
- 動态啟停 Server 和 Instance
- 檢視日志資訊
這裡文字有點說不清楚,具體請參見視訊: 第26章 100qps 三級緩存元件 實操
▌前期準備:
▌前期準備1:安裝zookeeper
可以參考我這篇”瘋狂創客圈總目錄 一鍵打造地表最強環境” 文章或者百度教程安裝。。叢集部署如下:
服務名稱 | IP/域名 | 端口 |
zookeeper(slave) | cdh1 | 2181 |
zookeeper(master) | cdh1 | 2182 |
zookeeper(slave) | cdh1 | 2183 |
▌前期準備2:安裝mysql
可以參考我這篇”瘋狂創客圈總目錄 一鍵打造地表最強環境” 文章或者百度教程安裝。
MySQL我隻部署單台:
服務名稱 | IP/域名 | 端口 |
mysql | cdh1 | 3306 |
使用者名:root,密碼:123456
▌前期準備3:MySQL開啟binlog
▌檢視 mysql 的配置檔案路徑:
[root@cdh1 canal-ha]# mysql --help|grep 'my.cnf'
order of preference, my.cnf, $MYSQL_TCP_PORT,
/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cnf
檢視原始的配置
[root@cdh1 canal-ha]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
skip-name-resolve
character_set_server=utf8
init_connect='SET NAMES utf8'
lower_case_table_names=1
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
▌MySQL的my.cnf中配置如下
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
完整的master配置檔案my.cnf
[mysqld]
# [必須]伺服器唯一ID,預設是1,一般取IP最後一段
server-id=1
# [必須]啟用二進制日志
log-bin=mysql-bin
# 複制過濾:也就是指定哪個資料庫不用同步(mysql庫一般不同步)
binlog-ignore-db=mysql
# 設定需要同步的資料庫 binlog_do_db = 資料庫名;
# 如果是多個同步庫,就以此格式另寫幾行即可。
# 如果不指明對某個具體庫同步,表示同步所有庫。除了binlog-ignore-db設定的忽略的庫
# binlog_do_db = test #需要同步test資料庫。
# 確定binlog日志寫入後與硬碟同步
sync_binlog = 1
# 跳過所有的錯誤,繼續執行複制操作
slave-skip-errors = all
溫馨提示:在主伺服器上最重要的二進制日志設定是sync_binlog,這使得mysql在每次送出事務的時候把二進制日志的内容同步到磁盤上,即使伺服器崩潰也會把事件寫入日志中。
sync_binlog這個參數是對于MySQL系統來說是至關重要的,他不僅影響到Binlog對MySQL所帶來的性能損耗,而且還影響到MySQL中資料的完整性。對于``"sync_binlog"``參數的各種設定的說明如下:
sync_binlog=0,當事務送出之後,MySQL不做fsync之類的磁盤同步指令重新整理binlog_cache中的資訊到磁盤,而讓Filesystem自行決定什麼時候來做同步,或者cache滿了之後才同步到磁盤。
sync_binlog=n,當每進行n次事務送出之後,MySQL将進行一次fsync之類的磁盤同步指令來将binlog_cache中的資料強制寫入磁盤。
在MySQL中系統預設的設定是sync_binlog=0,也就是不做任何強制性的磁盤重新整理指令,這時候的性能是最好的,但是風險也是最大的。因為一旦系統Crash,在binlog_cache中的所有binlog資訊都會被丢失。而當設定為“1”的時候,是最安全但是性能損耗最大的設定。因為當設定為1的時候,即使系統Crash,也最多丢失binlog_cache中未完成的一個事務,對實際資料沒有任何實質性影響。
從以往經驗和相關測試來看,對于高并發事務的系統來說,“sync_binlog”設定為0和設定為1的系統寫入性能差距可能高達5倍甚至更多。
「注意」:如果訂閱的是mysql的從庫,需要增加配置讓從庫日志也寫到binlog裡面
log_slave_updates=1
▌修改之後的配置
[root@cdh1 canal-ha]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
skip-name-resolve
character_set_server=utf8
init_connect='SET NAMES utf8'
lower_case_table_names=1
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
▌判斷配置是否生效
重新開機myql
[root@cdh1 canal-ha]# service mysqld restart
Redirecting to /bin/systemctl restart mysqld.service
可以通過在 mysql 終端中執行以下指令判斷配置是否生效:
mysql -uroot -p123456
show variables like 'log_bin';
show variables like 'binlog_format';
▌授權賬号權限
授權 canal 連結 MySQL 賬号具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant:
set global validate_password_policy=0;
set global validate_password_length=1;
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
▌前期準備4: 啟動RocketMQ
rmqnamesrv:
image: apacherocketmq/rocketmq:4.6.0
container_name: rmqnamesrv
restart: always
ports:
- 9876:9876
environment:
JAVA_OPT_EXT: "-server -Xms256m -Xmx1g"
volumes:
- ./rocketmq-namesrv/logs:/root/logs
command: sh mqnamesrv
networks:
mysql-canal-network:
aliases:
- rmqnamesrv
rmqbroker:
image: apacherocketmq/rocketmq:4.6.0
container_name: rmqbroker
restart: always
depends_on:
- rmqnamesrv
ports:
- 10909:10909
- 10911:10911
volumes:
- ./rocketmq-broker/logs:/root/logs
- ./rocketmq-broker/store:/root/store
- ./rocketmq-broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPT_EXT: "-server -Xms256m -Xmx1g"
networks:
mysql-canal-network:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console
restart: always
ports:
- 19001:9001
depends_on:
- rmqnamesrv
volumes:
- /etc/localtime:/etc/localtime:ro
- /home/rocketmq/console/logs:/root/logs
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Dserver.port=9001"
networks:
mysql-canal-network:
aliases:
- rmqconsole
▌配置和啟動canal-admin:
通過 尼恩的一鍵啟動腳本,可以完成 高可用 cannel 叢集的一鍵啟動
▌canal-admin 作用
- 通過圖形化界面管理配置參數。
- 動态啟停 Server 和 Instance
- 檢視日志資訊
▌給canal-admin建表
執行資料庫腳本
執行 conf 目錄下載下傳的 canal_manager.sql 腳步,初始化所需的庫表。
初始化SQL腳本裡會預設建立canal_manager的資料庫,建議使用root等有超級權限的賬号進行初始化
mysql -uroot -p123456
source /vagrant/3G-middleware/canal-ha/canal-admin/canal_manager.sql
▌canal-admin配置修改
執行 vim conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 192.168.56.121:3306
database: canal_manager
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
修改 address、database、username、password 四個參數
▌通路canal-admin
在浏覽器上面輸入 hostip:9089 即可進入到管理頁面,如果使用的預設的配置資訊,使用者名入”admin”,密碼輸入”123456”即可通路首頁。
通路canal admin 并且配置執行個體 / Instance
在浏覽器上面輸入 hostip:18089 即可進入到管理頁面,
如果使用的預設的配置資訊,使用者名入”admin”,密碼輸入”123456”即可通路首頁。
canal amin 123456
http://cdh1:18089
rocketmq
http://cdh1:19001
通路canal-admin,可以看到自動出現了一個Server,可在此頁面進行Server的配置、修改、啟動、檢視log等操作
▌叢集管理:
▌建立叢集
配置 「叢集名稱」 與 「ZK位址」
L3-cache-canal
192.168.56.121:2181
配置 「主配置」,該配置為叢集内的所有Server執行個體共享的
▌導入叢集模闆
▌改 instance 名稱
其中有一段代碼如下,配置 canal.destinations ,這是 這裡定義了 canal server 啟動的時候要添加的 instance 名稱, 預設是 為空 執行個體
#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
如果要一個 example 執行個體,執行個體的配置檔案為
/conf/example/instance.properties 檔案
那麼,要這麼配置
#################################################
######### destinations #############
#################################################
# 這裡定義了 canal server 啟動的時候要添加的 instance 名稱,預設是 example
canal.destinations = example
# 這裡定義了 canal server 查找 instance 配置檔案的根路徑。
# 舉個例子,假如前面配置了 example instance, 那麼 canal server 會查找 ../conf/example/instance.properties 檔案
canal.conf.dir = ../conf
# 這裡控制着 canal server 是否在運作過程中自動掃描 canal.conf.dir 目錄以動态添加或删除 instance,預設打開,掃描時間間隔 5s
canal.auto.scan = true
canal.auto.scan.interval = 5
注:
canal是允許配置多個執行個體(instance),
假設每個canal.server服務都有相同的兩個執行個體(在conf目錄下分别建兩個執行個體檔案夾:example1和example2,
同時把預設執行個體example檔案夾裡的instance.properties檔案拷貝一份過去),
修改兩個執行個體canal.properties配置就能使其生效,
在“destinations”标題下找到canal.destinations選項修改如下:
canal.destinations = example1, example2
編輯vi conf/example/instance.properties檔案(如果是多執行個體,則每個執行個體目錄下該檔案都要修改配置,現在以canal.server01服務為例)
# mysql叢集配置中的serverId概念,需要保證和目前mysql叢集中id唯一
canal.instance.mysql.slaveId=129
# mysql資料庫連接配接位址和端口
canal.instance.master.address=8.135.110.120:3306
# mysql資料庫使用者名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=qwer1234
# mq配置(如果是沒用到MQ,則修改為執行個體名稱即可)
canal.mq.topic= example1
Or
# mq配置(如果是用到MQ,則修改為mq路由key)
canal.mq.topic=canal.routingkey.test
▌修改 serverMode
然後把canal.serverMode選項修改為rocketMQ類型:
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
同時canal需要MQ進行同步資料,是以在“rocketMQ” 标題下找到rocketMQ配置進行修改:
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = canal_producer
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = rmqnamesrv:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
▌配置zookeeper
主要修改以下配置:
- 「canal.zkServers」 配置zookeeper叢集位址
- 「canal.instance.global.spring.xml」 改為classpath:spring/default-instance.xml
部署完canal.server兩個服務後,叢集想要生效,還需要同時修改兩台服務的配置重新啟動才可以,
注意:同時修改兩台服務的配置重新啟動
具體操作如下:
編輯vi conf/ canal.properties檔案
因為canal.server叢集需要zookeeper,
是以在“common argument”标題下找到canal.zkServers選項修改為zookeeper叢集位址;
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = 192.168.56.121:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
zk配置多個節點
canal.zkServers = 192.168.142.129:2181,192.168.142.130:2181,192.168.142.131:2181
HA模式是依賴于instance name進行管理,必須都選擇default-instance.xml配置。
在“destinations”标題下找到canal.instance.global.spring.xml選項進行啟用(其他兩個選項注釋):
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
▌完整的server 配置參考
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = 192.168.56.121:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:6667
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = canal_producer
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = rmqnamesrv:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
▌建立Server,關聯叢集
這裡是server 清單,能看到 自動的注冊到 canal.admin 的Server
如果 server 配置了 canal.admin 的 管理端口,會自動的注冊到 canal.admin
environment: # 設定環境變量,相當于docker run指令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.admin.manager: canal-admin:8089
canal.admin.port: 11110
canal.admin.user: admin
canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# canal.admin.register.cluster: online
如果沒有自動注冊過來,可以手動添加
▌手動添加server,關聯叢集
配置項:
- 所屬叢集,可以選擇為單機 或者 叢集。
- 一般單機Server的模式主要用于一次性的任務或者測試任務
- Server名稱,唯一即可,友善自己記憶
- Server Ip,機器ip
- admin端口,canal 1.1.4版本新增的能力,會在canal-server上提供遠端管理操作,預設值11110
- tcp端口,canal提供netty資料訂閱服務的端口
- metric端口, promethues的exporter監控資料端口
多台Server關聯同一個叢集即可形成主備HA架構
▌配置 canal server
可在此頁面進行Server的配置、修改、啟動、檢視log等操作
▌配置 canal server 的入口
可以通過 後邊的 配置操作,進行server的配置
看到的 canal.properties 檔案是 canal server 的基礎配置檔案,
配置實際上就是修改這個檔案
▌改 instance 名稱
其中有一段代碼如下,配置 canal.destinations ,這是 這裡定義了 canal server 啟動的時候要添加的 instance 名稱, 預設是 為空 執行個體
#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
如果要一個 example 執行個體,執行個體的配置檔案為
/conf/example/instance.properties 檔案
那麼,要這麼配置
#################################################
######### destinations #############
#################################################
# 這裡定義了 canal server 啟動的時候要添加的 instance 名稱,預設是 example
canal.destinations = example
# 這裡定義了 canal server 查找 instance 配置檔案的根路徑。
# 舉個例子,假如前面配置了 example instance, 那麼 canal server 會查找 ../conf/example/instance.properties 檔案
canal.conf.dir = ../conf
# 這裡控制着 canal server 是否在運作過程中自動掃描 canal.conf.dir 目錄以動态添加或删除 instance,預設打開,掃描時間間隔 5s
canal.auto.scan = true
canal.auto.scan.interval = 5
▌修改 serverMode
然後把canal.serverMode選項修改為rocketMQ類型:
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
同時canal需要MQ進行同步資料,是以在“rocketMQ” 标題下找到rocketMQ配置進行修改:
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = canal_producer
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = rmqnamesrv:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
▌配置zookeeper
部署完canal.server兩個服務後,叢集想要生效,還需要同時修改兩台服務的配置重新啟動才可以,
注意:同時修改兩台服務的配置重新啟動
具體操作如下:
編輯vi conf/ canal.properties檔案
因為canal.server叢集需要zookeeper,
是以在“common argument”标題下找到canal.zkServers選項修改為zookeeper叢集位址;
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = 192.168.56.121:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
zk配置多個節點
canal.zkServers = 192.168.142.129:2181,192.168.142.130:2181,192.168.142.131:2181
HA模式是依賴于instance name進行管理,必須都選擇default-instance.xml配置。
在“destinations”标題下找到canal.instance.global.spring.xml選項進行啟用(其他兩個選項注釋):
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
注:
canal是允許配置多個執行個體(instance),
假設每個canal.server服務都有相同的兩個執行個體(在conf目錄下分别建兩個執行個體檔案夾:example1和example2,
同時把預設執行個體example檔案夾裡的instance.properties檔案拷貝一份過去),
修改兩個執行個體canal.properties配置就能使其生效,
在“destinations”标題下找到canal.destinations選項修改如下:
canal.destinations = example1, example2
編輯vi conf/example/instance.properties檔案(如果是多執行個體,則每個執行個體目錄下該檔案都要修改配置,現在以canal.server01服務為例)
# mysql叢集配置中的serverId概念,需要保證和目前mysql叢集中id唯一
canal.instance.mysql.slaveId=129
# mysql資料庫連接配接位址和端口
canal.instance.master.address=8.135.110.120:3306
# mysql資料庫使用者名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=qwer1234
# mq配置(如果是沒用到MQ,則修改為執行個體名稱即可)
canal.mq.topic= example1
Or
# mq配置(如果是用到MQ,則修改為mq路由key)
canal.mq.topic=canal.routingkey.test
▌完整的server 配置參考
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = 192.168.56.121:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:6667
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = canal_producer
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = rmqnamesrv:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
▌建立Instance
每個 Instance 「執行個體」 關聯一個同步的資料源,
如果有多個資料源需要同步則需要建立多個 Instance 「執行個體」
- 先填寫執行個體名
- 選擇剛剛建立的叢集
- 載入模闆配置
▌Instance配置
主要修改以下配置:
- 「canal.instance.master.address」 配置要同步的資料庫位址
- 「canal.instance.dbUsername」 資料庫使用者名(需同步權限)
- 「canal.instance.dbPassword」 資料庫密碼
- 「canal.instance.filter.regex」 mysql 資料解析關注的表,Perl正規表達式.
多個正則之間以逗号(,)分隔,轉義符需要雙斜杠(\)
canal.instance.filter.regex常見例子:
- 所有表:.* or ...
- canal schema下所有表:canal..*
- canal下的以canal打頭的表:canal.canal.*
- canal schema下的一張表:canal.test1
- 多個規則組合使用:canal..*,mysql.test1,mysql.test2 (逗号分隔)
- 注意:此過濾條件隻針對row模式的資料有效
- 為啥呢?
- mixed/statement因為不解析sql,是以無法準确提取tableName進行過濾
▌建立Instance:關聯叢集,并配置源庫資訊
canal.instance.master.address=192.168.56.121:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
▌instance模闆
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.56.121:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://192.168.56.121:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=canal_log
# dynamic topic route by schema or table regex
# canal.mq.dynamicTopic=test.user,student\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=test.users:uid,.*\\..*
##################################################
######### MQ #############
##################################################
canal.mq.servers = 192.168.56.122:9876
#canal.mq.servers = rmqnamesrv:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = canal_producer
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
去Instance清單新增Instance,可選擇載入模版進行修改,可參考上文中的canal相關配置檔案修改
點選側邊欄的Instance管理,選擇建立 Instance,選擇那個唯一的主機,再點選載入模闆,修改下面的一些參數:
執行個體名稱随便填一個就行。
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
# canal.serverMode = tcp
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
rocketmq.producer.group = canal_producers
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
#rocketmq.namesrv.addr = 127.0.0.1:9876
#rocketmq.namesrv.addr = 192.168.56.122:9876
rocketmq.namesrv.addr = rmqnamesrv:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
▌啟動 新執行個體
建立好的新執行個體預設是停止狀态,将其啟動。
▌驗證:看看canal 叢集安裝成功
▌從master 檢視從節點
mysql -uroot -p123456
select * from information_schema.processlist as p where p.command = 'Binlog Dump';
▌資料CRUD操作
登入mysql增删改一條資料
▌rocketmq 管理背景
在rocketmq 管理背景隊列上會看到這兩條語句待消費消息:
rocketmq
http://cdh1:19001
▌技術自由的實作路徑 PDF領取:
▌實作你的架構自由:
- 《吃透8圖1模闆,人人可以做架構》PDF
- 《10Wqps評論中台,如何架構?B站是這麼做的!!!》PDF
- 《阿裡二面:千萬級、億級資料,如何性能優化? 教科書級 答案來了》PDF
- 《峰值21WQps、億級DAU,小遊戲《羊了個羊》是怎麼架構的?》PDF
- 《100億級訂單怎麼排程,來一個大廠的極品方案》PDF
- 《2個大廠 100億級 超大流量 紅包 架構方案》PDF
… 更多架構文章,正在添加中
▌實作你的 響應式 自由:
- 《響應式聖經:10W字,實作Spring響應式程式設計自由》PDF
- 這是老版本 《Flux、Mono、Reactor 實戰(史上最全)》PDF
▌實作你的 spring cloud 自由:
- 《Spring cloud Alibaba 學習聖經》 PDF
- 《分庫分表 Sharding-JDBC 底層原理、核心實戰(史上最全)》PDF
- 《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之間混亂關系(史上最全)》PDF
▌實作你的 linux 自由:
- 《Linux指令大全:2W多字,一次實作Linux自由》PDF
▌實作你的 網絡 自由:
- 《TCP協定詳解 (史上最全)》PDF
- 《網絡三張表:ARP表, MAC表, 路由表,實作你的網絡自由!!》PDF
▌實作你的 分布式鎖 自由:
- 《Redis分布式鎖(圖解 - 秒懂 - 史上最全)》PDF
- 《Zookeeper 分布式鎖 - 圖解 - 秒懂》PDF
▌實作你的 王者元件 自由:
- 《隊列之王: Disruptor 原理、架構、源碼 一文穿透》PDF
- 《緩存之王:Caffeine 源碼、架構、原理(史上最全,10W字 超級長文)》PDF
- 《緩存之王:Caffeine 的使用(史上最全)》PDF
- 《Java Agent 探針、位元組碼增強 ByteBuddy(史上最全)》PDF
▌實作你的 面試題 自由:
4000頁《尼恩Java面試寶典》PDF 40個專題
....
注:以上尼恩 架構筆記、面試題 的PDF檔案,請到《技術自由圈》公衆号領取
還需要啥自由,可以告訴尼恩。 尼恩幫你實作.......