天天看點

Canal Kafka RocketMQ QuickStart

Canal Kafka RocketMQ QuickStart

基本說明

canal 1.1.1版本之後, 預設支援将canal server接收到的binlog資料直接投遞到MQ, 目前預設支援的MQ系統有:

kafka: https://github.com/apache/kafka

RocketMQ : https://github.com/apache/rocketmq

一、 安裝zookeeper

參考:Zookeeper QuickStart

二、安裝MQ

Kafka安裝參考:Kafka QuickStart

RocketMQ安裝參考:

[RocketMQ QuickStart](https://rocketmq.apache.org/docs/quick-start)

三、 安裝canal.server

3.1 下載下傳壓縮包

到官網位址(release)下載下傳最新壓縮包,請下載下傳 canal.deployer-latest.tar.gz

3.2 将canal.deployer 複制到固定目錄并解壓

mkdir -p /usr/local/canal

cp canal.deployer-1.1.1.tar.gz /usr/local/canal

tar -zxvf canal.deployer-1.1.1.tar.gz

3.3 配置修改參數

a. 修改instance 配置檔案 vi conf/example/instance.properties

#  按需修改成自己的資料庫資訊
	#################################################
	...
	canal.instance.master.address=192.168.1.20:3306
	# username/password,資料庫的使用者名和密碼
	...
	canal.instance.dbUsername = canal
	canal.instance.dbPassword = canal
	...
	# mq config
	canal.mq.topic=example
	# 針對庫名或者表名發送動态topic
	#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
	canal.mq.partition=0
	# hash partition config
	#canal.mq.partitionsNum=3
	#庫名.表名: 唯一主鍵,多個表之間用逗号分隔
	#canal.mq.partitionHash=mytest.person:id,mytest.role:id
	#################################################
           

對應ip 位址的MySQL 資料庫需進行相關初始化與設定, 可參考 Canal QuickStart

b. 修改canal 配置檔案vi /usr/local/canal/conf/canal.properties

# ...
# 可選項: tcp(預設), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 叢集配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
# flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請将該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 預設50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get資料的逾時時間, 機關: 毫秒, 空為不限逾時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務
canal.mq.transaction = false
           

mq相關參數說明

Canal Kafka RocketMQ QuickStart
Canal Kafka RocketMQ QuickStart

canal.mq.dynamicTopic 表達式說明

canal 1.1.3版本之後, 支援配置格式:schema 或 schema.table,多個配置之間使用逗号或分号分隔

例子1:test\\.test 指定比對的單表,發送到以test_test為名字的topic上
例子2:.*\\..* 比對所有表,則每個表都會發送到各自表名的topic上
例子3:test 指定比對對應的庫,一個庫的所有表都會發送到庫名的topic上
例子4:test\\..* 指定比對的表達式,針對比對的表會發送到各自表名的topic上
例子5:test,test1\\.test1,指定多個表達式,會将test庫的表都發送到test的topic上,test1\\.test1的表發送到對應的test1_test1 topic上,其餘的表發送到預設的canal.mq.topic值
為滿足更大的靈活性,允許對比對條件的規則指定發送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

例子1: test:test\\.test 指定比對的單表,發送到以test為名字的topic上
例子2: test:.*\\..* 比對所有表,因為有指定topic,則每個表都會發送到test的topic下
例子3: test:test 指定比對對應的庫,一個庫的所有表都會發送到test的topic下
例子4:testA:test\\..* 指定比對的表達式,針對比對的表會發送到testA的topic下
例子5:test0:test,test1:test1\\.test1,指定多個表達式,會将test庫的表都發送到test0的topic下,test1\\.test1的表發送到對應的test1的topic下,其餘的表發送到預設的canal.mq.topic值
大家可以結合自己的業務需求,設定比對規則,建議MQ開啟自動建立topic的能力
           

canal.mq.partitionHash 表達式說明

canal 1.1.3版本之後, 支援配置格式:schema.table:pk1^pk2,多個配置之間使用逗号分隔

例子1:test\\.test:pk1^pk2 指定比對的單表,對應的hash字段為pk1 + pk2
例子2:.*\\..*:id 正則比對,指定所有正則比對的表對應的hash字段為id
例子3:.*\\..*:$pk$ 正則比對,指定所有正則比對的表對應的hash字段為表主鍵(自動查找)
例子4: 比對規則啥都不寫,則預設發到0這個partition上
例子5:.*\\..* ,不指定pk資訊的正則比對,将所有正則比對的表,對應的hash字段為表名
按表hash: 一張表的所有資料可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
例子6: test\\.test:id,.\\..* , 針對test的表按照id散列,其餘的表按照table散列
注意:大家可以結合自己的業務需求,設定比對規則,多條比對規則之間是按照順序進行比對(命中一條規則就傳回)

其他詳細參數可參考Canal AdminGuide
           

mq順序性問題

binlog本身是有序的,寫入到mq之後如何保障順序是很多人會比較關注,在issue裡也有非常多人咨詢了類似的問題,這裡做一個統一的解答

canal目前選擇支援的kafka/rocketmq,本質上都是基于本地檔案的方式來支援了分區級的順序消息的能力,也就是binlog寫入mq是可以有一些順序性保障,這個取決于使用者的一些參數選擇
canal支援MQ資料的幾種路由方式:單topic單分區,單topic多分區、多topic單分區、多topic多分區
canal.mq.dynamicTopic,主要控制是否是單topic還是多topic,針對命中條件的表可以發到表名對應的topic、庫名對應的topic、預設topic name
canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分區以及分區的partition的路由計算,針對命中條件的可以做到按表級做分區、pk級做分區等
canal的消費順序性,主要取決于描述2中的路由選擇,舉例說明:
單topic單分區,可以嚴格保證和binlog一樣的順序性,缺點就是性能比較慢,單分區的性能寫入大概在2~3k的TPS
多topic單分區,可以保證表級别的順序性,一張表或者一個庫的所有資料都寫入到一個topic的單分區中,可以保證有序性,針對熱點表也存在寫入分區的性能問題
單topic、多topic的多分區,如果使用者選擇的是指定table的方式,那和第二部分一樣,保障的是表級别的順序性(存在熱點表寫入分區的性能問題),如果使用者選擇的是指定pk hash的方式,那隻能保障的是一個pk的多次binlog順序性 ** pk hash的方式需要業務權衡,這裡性能會最好,但如果業務上有pk變更或者對多pk資料有順序性依賴,就會産生業務處理錯亂的情況. 如果有pk變更,pk變更前和變更後的值會落在不同的分區裡,業務消費就會有先後順序的問題,需要注意

           

MQ發送性能資料

1.1.5版本可以在5k~50k左右,具體可參考:Canal-MQ-Performance

阿裡雲RocketMQ對接參數

# 配置ak/sk
	canal.aliyun.accessKey = XXX
	canal.aliyun.secretKey = XXX
	# 配置topic
	canal.mq.accessChannel = cloud
	canal.mq.servers = 内網接入點
	canal.mq.producerGroup = GID_**group(在背景建立)
	canal.mq.namespace = rocketmq執行個體id
	canal.mq.topic=(在背景建立)
           

3.4 啟動

cd /usr/local/canal/

sh bin/startup.sh

3.5 檢視日志

a.檢視 logs/canal/canal.log

vi logs/canal/canal.log

b. 檢視instance的日志:

vi logs/example/example.log

3.6 關閉

cd /usr/local/canal/

sh bin/stop.sh

繼續閱讀