天天看点

Maxwell部署

Maxwell介绍:

Maxwell是MySQL到Kafka的消息中间件,消息格式采用Json。支持断点还原以及批量抽取(bootstrap)

官网:http://maxwells-daemon.io/

github:https://github.com/zendesk/maxwell

1、配置mysql

(1) 修改mysql配置

vi /etc/my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
           

(2) 创建Maxwell的db和用户(用户名是maxwell,密码是ruozedata)

mysql> create database maxwell;
Query OK, 1 row affected (0.03 sec)

mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'ruozedata';
Query OK, 0 rows affected (0.00 sec)

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
           

2、安装部署(可参考官网http://maxwells-daemon.io/quickstart/)

(1)  下载安装包:https://github.com/zendesk/maxwell/releases/download/v1.14.4/maxwell-1.14.4.tar.gz

(2)  解压: tar -zxf maxwell-1.14.4.tar.gz 

(3) 切换到解压的目录 :cd maxwell-1.14.4/

(4) 运行maxwell(bda2是我的mysql主机名)

bin/maxwell --user='maxwell' --password='ruozedata' --host='bda2' --producer=stdout

(--producer=stdout 常用于测试,是在console上打印出结果)

(5) 测试

在mysql中,选择数据库和表,并执行增删改命令

mysql> insert into yyb(yybcode,count) values('mm',11);

在maxwell窗口见到如下类似打印,则说明部署成功

{"database":"test","table":"yyb","type":"insert","ts":1530586183,"xid":522,"commit":true,"data":{"yybcode":"mm","count":11}}

3、Maxwell对接Kafka

[[email protected] maxwell-1.14.4]# bin/maxwell --help    查看maxwell命令帮助可知kafka对应配置选项,红色的需要特别关注

--producer        producer type: stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis  

--producer_partition_by   database|table|primary_key|column, kafka/kinesis producers will partition by this value   

--producer_partition_columns    with producer_partition_by=column, partition by the value of these columns.  comma separated.     

--producer_partition_by_fallback    database|table|primary_key, fallback to this value when using 'column' partitioning and the       

                                      columns are not present in the row                                                              

--kafka_version                     kafka client library version: 0.8.2.2|0.9.0.1|0.10.0.1|0.10.2.1|0.11.0.1                          

--kafka.bootstrap.servers           at least one kafka server, formatted as HOST:PORT[,HOST:PORT]                                     

--kafka_partition_hash              default|murmur3, hash function for partitioning                                                   

--kafka_topic                       optionally provide a topic name to push to. default: maxwell                                      

--kafka_key_format                  how to format the kafka key; array|hash         

maxwell对接kafka执行参考命令如下:

bin/maxwell --user='maxwell' \         mysql数据库名

--password='ruozedata' \                 mysql密码

--host='bda2' \                                  mysql主机名

--producer=kafka \                           指定生产者类型为kafka

--kafka_topic=sparktest \                  指定kafka topic

--kafka.bootstrap.servers=bda4:9092 \    指定kafka broker,至少指定一个

--kafka_version=0.10.0.1                          指定kafka client版本