天天看点

Canal+Kafka:增量订阅消费MySQL binlog

作者:明少三年

在目前的大数据生态里面,生产数据的增量订阅/消费组件基本上都是Canal。基于Canal实现的主流的组件架构:

Canal+Kafka:增量订阅消费MySQL binlog

数据流图

当前Canal最新的版本是1.1.6,如下的内容也是基于该版本。

前提条件:MySQL相关配置

## 开启 Binlog 写入功能,编辑 my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id= # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复,这个在 v1.0.26+版本后,能自动生成,可以不配置。

## 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;           

Canal安装

Canal的安装很简单,对应的binary包要下载正确,我这里使用的是“canal.deployer-1.1.6.tar.gz”。解压后有如下的目录:

  • bin目录放置一些可执行文件。
  • conf目录放置配置文件。
  • lib目录放置canal.admin需要的jar。
  • logs目录放置日志文件。
  • plugin目录放置插件,例如各类mq、kafka等。

canal配置分为两部分:

“/canal/conf/canal.propertie”配置文件和“/canal/conf/example/instance.properties”配置文件。前者定义instance的公共参数,后者定义instance级别的配置,每个instance一份。

现在先修改 conf/example/instance.properties配置文件的如下内容:

## mysql serverId
canal.instance.mysql.slaveId = 1234 # 数字需要唯一,最起码在相同的MySQL主从集群内,v1.0.26+ 之后自动生成。

# position info
canal.instance.master.address = MySQLServerIp:Port # 需要收集的mysql地址。

# username/password
canal.instance.dbUsername = 用户名  
canal.instance.dbPassword = 密码

# table regex
canal.instance.filter.regex=需要收集的库名\\..*,需要收集的库名\\..*
#canal.instance.filter.regex=.*\\..*

  # table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*,要屏蔽掉的库名\\..*
           

配置接入kafka

我现在再次编辑修改“conf/example/instance.properties”,配置Kafka相关信息。

# mq config
# 在消息队列Kafka版控制台创建的TopicName。
canal.mq.topic=事先在Kafka内生成的topicName

# 数据同步到消息队列Kafka版Topic的指定分区。
canal.mq.partition=0           

现在修改“conf/canal.properties”,继续配置KafkaServer相关信息。

# 这里的类型需设置为kafka。
canal.serverMode = kafka

# kafka配置。
kafka.bootstrap.servers = KafkaBrokerIp:Port

# 其他的先按配置文件默认设置就阔以了。           

启动Canal

cd /data/canal;/bin/startup.sh

# 查看 server 日志
tailf logs/canal/canal.log

# 查看 instance 的日志
tailf logs/example/example.log           

如图内的日志输出,代表Canal服务已经正常run了

Canal+Kafka:增量订阅消费MySQL binlog

最后从Kafka读取消息,验证整个流程是否达到目标:

Canal+Kafka:增量订阅消费MySQL binlog

说在最后

Canal很容易上手,不过需要考虑性能和消息顺序一致性的问题,我这里采用的是单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢。

继续阅读