Maxwell介紹:
Maxwell是MySQL到Kafka的消息中間件,消息格式采用Json。支援斷點還原以及批量抽取(bootstrap)
官網:http://maxwells-daemon.io/
github:https://github.com/zendesk/maxwell
1、配置mysql
(1) 修改mysql配置
vi /etc/my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
(2) 建立Maxwell的db和使用者(使用者名是maxwell,密碼是ruozedata)
mysql> create database maxwell;
Query OK, 1 row affected (0.03 sec)
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'ruozedata';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
2、安裝部署(可參考官網http://maxwells-daemon.io/quickstart/)
(1) 下載下傳安裝包:https://github.com/zendesk/maxwell/releases/download/v1.14.4/maxwell-1.14.4.tar.gz
(2) 解壓: tar -zxf maxwell-1.14.4.tar.gz
(3) 切換到解壓的目錄 :cd maxwell-1.14.4/
(4) 運作maxwell(bda2是我的mysql主機名)
bin/maxwell --user='maxwell' --password='ruozedata' --host='bda2' --producer=stdout
(--producer=stdout 常用于測試,是在console上列印出結果)
(5) 測試
在mysql中,選擇資料庫和表,并執行增删改指令
mysql> insert into yyb(yybcode,count) values('mm',11);
在maxwell視窗見到如下類似列印,則說明部署成功
{"database":"test","table":"yyb","type":"insert","ts":1530586183,"xid":522,"commit":true,"data":{"yybcode":"mm","count":11}}
3、Maxwell對接Kafka
[[email protected] maxwell-1.14.4]# bin/maxwell --help 檢視maxwell指令幫助可知kafka對應配置選項,紅色的需要特别關注
--producer producer type: stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
--producer_partition_by database|table|primary_key|column, kafka/kinesis producers will partition by this value
--producer_partition_columns with producer_partition_by=column, partition by the value of these columns. comma separated.
--producer_partition_by_fallback database|table|primary_key, fallback to this value when using 'column' partitioning and the
columns are not present in the row
--kafka_version kafka client library version: 0.8.2.2|0.9.0.1|0.10.0.1|0.10.2.1|0.11.0.1
--kafka.bootstrap.servers at least one kafka server, formatted as HOST:PORT[,HOST:PORT]
--kafka_partition_hash default|murmur3, hash function for partitioning
--kafka_topic optionally provide a topic name to push to. default: maxwell
--kafka_key_format how to format the kafka key; array|hash
maxwell對接kafka執行參考指令如下:
bin/maxwell --user='maxwell' \ mysql資料庫名
--password='ruozedata' \ mysql密碼
--host='bda2' \ mysql主機名
--producer=kafka \ 指定生産者類型為kafka
--kafka_topic=sparktest \ 指定kafka topic
--kafka.bootstrap.servers=bda4:9092 \ 指定kafka broker,至少指定一個
--kafka_version=0.10.0.1 指定kafka client版本