天天看点

canal+kafka部署测试全记录

canal地址:https://github.com/alibaba/canal

这里只介绍部署,简介原理参见开源介绍。

一、简介

        canal1.1.1版本以后,默认支持将canal server接收到的binlog数据直接投递到MQ,目前默认支持的MQ系统有:

  • kafka:https://github.com/apache/kafka
  • RocketMQ:https://github.com/apache/rocketmq

本次部署采用kafka,单节点的,集群的后续更新。有点挫的是mysql、kafka、canal都部在同一台机器上,这个没什么影响。。

二、环境

  • 操作系统:Red Hat 4.8.5-28(这个无所谓,linux系统就行,查看命令  cat /proc/version)
  • java版本:jdk1.8
  • canal版本:最好下载最新的安装包,本次用的是V1.1.4的canal.deployer-1.1.4.tar.gz
  • mysql版本:5.7.22
  • kafka版本:kafka_2.12-2.1.0.tgz

三、部署kafka

参考:Idea使用springboot整合kafka

kafka下载:从官网下载 kafka_2.12-2.1.0.tgz,解压在虚拟机(服务器)上

tar -zxvf kafka_2.12-2.1.0.tgz

进入到kafka的解压目录,启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties &

打开另一个终端,启动kafka:

bin/kafka-server-start.sh config/server.properties

打开另一个终端,创建消费者(即canal中配置的实例中的topic):example,这一步应该在canal server部好后做

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning

四、安装配置mysql

  • 未安装mysql的,请先安装mysql,参考:Mysql数据库最详细安装步骤
  • 已安装mysql的,开启binlog写入功能,并且配置binlog模式为row,具体如下:
vi /etc/my.cnf

添加下面这段:
[mysqld]  
log-bin=mysql-bin #添加这一行就ok  
binlog-format=ROW #选择row模式  
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
           
  • 重启mysql,测试my.cnf配置是否生效:
重启mysql
service mysqld restart

查看binlog模式
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

是否启用了日志
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
           
  •  创建用户canal,并赋予mysql slave的权限,目的是让mysql以为canal是一个slave
--创建用户canal,密码canal
CREATE USER canal IDENTIFIED BY 'canal';  
--赋权所有库表
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
--赋所有权,最好也执行一下,不然canal是没有建库的权限的
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
--刷新
FLUSH PRIVILEGES;
           

 至此,mysql部分就算完了,mysql部分有问题的同样参考上述链接。

五、部署canal server

  • 下载canal

    到官网地址(release)下载最新压缩包,请下载 canal.deployer-

    latest

    .tar.gz

    本次下载的最新包名为canal.deployer-1.1.4.tar.gz

    官方连接:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

  • 上传canal.deployer-1.1.4.tar.gz到指定目录并解压,这个看个人习惯,我通常是放到/data下的
mkdir -p /data/canal
tar -zxvf canal.deployer-1.1.4.tar.gz
           
  •  修改instance 配置文件 vi conf/example/instance.properties
#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=127.0.0.1:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
# 这个topic就是发到kafka的
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
           
  •  修改canal 配置文件vi /usr/local/canal/conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象 使用文本格式(JSON)进行传输,否则Kafka里扔进去的是二进制数据,虽然不影响,但是看起来不方便
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
           
  • 启动canal
cd /data/canal
sh bin/startup.sh
           
  • 查看日志
查看canal日志
tail -f logs/canal/canal.log
查看instanse日志
tail -f log/example/example.log
           
  • 关闭canal
cd /data/canal
sh bin/stop.sh
           
  • 文末附上个人的文件配置信息

六、测试

  • 创建测试库、表
  • 增删改修
CREATE TABLE canal_test
    (
        id INT(10),
        name VARCHAR(100),
        create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ENGINE=InnoDB DEFAULT CHARSET=utf8 DEFAULT COLLATE=utf8_general_ci COMMENT='canal测试表';

alter table  canal_test modify column id int unsigned not Null auto_increment primary key;
insert into canal_test(name) values ('jjjjjjjjjjjjj');
update canal_test set name='kkkkkkkkk' where id=10;
delete from canal_test where id=10;
           
  •  kafka消费端:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning
{"data":null,"database":"test","es":1589362351000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"alter table  canal_test modify column id int unsigned not Null auto_increment primary key","sqlType":null,"table":"canal_test","ts":1589362465210,"type":"ALTER"}
{"data":[{"id":"10","name":"kkkkkkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362545000,"id":10,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362545945,"type":"INSERT"}
{"data":[{"id":"10","name":"kkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362599000,"id":15,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":[{"name":"kkkkkkkkkkkkk"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362599943,"type":"UPDATE"}
{"data":[{"id":"10","name":"kkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362639000,"id":18,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362639115,"type":"DELETE"}
           

七、问题

  • mysql root密码不知道,怎么创建用户呢?
vi /etc/my.cnf

添加下面这行
#跳过验证登录
skip-grant-tables=1

重启mysql,重新进入,更新root密码,完后再重启,注意忽略验证的话没法建用户的
mysql> update user set authentication_string = password('123456'), password_expired = 'N', password_last_changed = now() where user = 'root';
           

八、附

  • instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*
#全库的话太多了,容易看花,只配置一个test库下的canal_test表,测起来方便点
canal.instance.filter.regex=test\\.canal_test
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
           
  • canal.properties
#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip = 127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########                    MQ                      #############
##################################################
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
           

 参考:https://www.cnblogs.com/seliote/p/11721884.html