天天看點

Maxwell部署

Maxwell介紹:

Maxwell是MySQL到Kafka的消息中間件,消息格式采用Json。支援斷點還原以及批量抽取(bootstrap)

官網:http://maxwells-daemon.io/

github:https://github.com/zendesk/maxwell

1、配置mysql

(1) 修改mysql配置

vi /etc/my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
           

(2) 建立Maxwell的db和使用者(使用者名是maxwell,密碼是ruozedata)

mysql> create database maxwell;
Query OK, 1 row affected (0.03 sec)

mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'ruozedata';
Query OK, 0 rows affected (0.00 sec)

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
           

2、安裝部署(可參考官網http://maxwells-daemon.io/quickstart/)

(1)  下載下傳安裝包:https://github.com/zendesk/maxwell/releases/download/v1.14.4/maxwell-1.14.4.tar.gz

(2)  解壓: tar -zxf maxwell-1.14.4.tar.gz 

(3) 切換到解壓的目錄 :cd maxwell-1.14.4/

(4) 運作maxwell(bda2是我的mysql主機名)

bin/maxwell --user='maxwell' --password='ruozedata' --host='bda2' --producer=stdout

(--producer=stdout 常用于測試,是在console上列印出結果)

(5) 測試

在mysql中,選擇資料庫和表,并執行增删改指令

mysql> insert into yyb(yybcode,count) values('mm',11);

在maxwell視窗見到如下類似列印,則說明部署成功

{"database":"test","table":"yyb","type":"insert","ts":1530586183,"xid":522,"commit":true,"data":{"yybcode":"mm","count":11}}

3、Maxwell對接Kafka

[[email protected] maxwell-1.14.4]# bin/maxwell --help    檢視maxwell指令幫助可知kafka對應配置選項,紅色的需要特别關注

--producer        producer type: stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis  

--producer_partition_by   database|table|primary_key|column, kafka/kinesis producers will partition by this value   

--producer_partition_columns    with producer_partition_by=column, partition by the value of these columns.  comma separated.     

--producer_partition_by_fallback    database|table|primary_key, fallback to this value when using 'column' partitioning and the       

                                      columns are not present in the row                                                              

--kafka_version                     kafka client library version: 0.8.2.2|0.9.0.1|0.10.0.1|0.10.2.1|0.11.0.1                          

--kafka.bootstrap.servers           at least one kafka server, formatted as HOST:PORT[,HOST:PORT]                                     

--kafka_partition_hash              default|murmur3, hash function for partitioning                                                   

--kafka_topic                       optionally provide a topic name to push to. default: maxwell                                      

--kafka_key_format                  how to format the kafka key; array|hash         

maxwell對接kafka執行參考指令如下:

bin/maxwell --user='maxwell' \         mysql資料庫名

--password='ruozedata' \                 mysql密碼

--host='bda2' \                                  mysql主機名

--producer=kafka \                           指定生産者類型為kafka

--kafka_topic=sparktest \                  指定kafka topic

--kafka.bootstrap.servers=bda4:9092 \    指定kafka broker,至少指定一個

--kafka_version=0.10.0.1                          指定kafka client版本