天天看點

無侵入式的mysql的binlog采集——maxwell采集binlog放到kafka中——成功!

調研過flume,目前采用datax,但是都是具有侵入式,即使再增量也會影響伺服器性能,詳細藐視可以檢視我以前的文章。

調研flume、cannal、outter、maxwell

最後無侵入式的,實時的。

主要差別:

1、雖然Maxwell不能直接支援HA,但是它支援斷點還原,即錯誤解決後重新開機繼續上次點兒讀取資料。

2、Canal是服務端,資料過來了并不能直接寫出去,需要一個用戶端:syncClient去擷取資料

Maxwell即是服務端也是用戶端。

3、Maxwell支援Bootstrap,即刷全量的資料,而Canal不支援。

4、Maxwell隻支援Json,而Canel資料格式自由

個人選擇Maxwell:

a、服務端和用戶端是一體的

b、Maxwell是輕量級的,出錯風險低,Canal經常出錯

c、雖然部署的是單台,但是具有斷點還原能力,出錯好解決

d、Maxwell代碼品質非常好,且社群更加的活躍

首先來看下下載下傳與安裝配置。

Download:

https://github.com/zendesk/maxwell/releases/download/v1.22.1/maxwell-1.22.1.tar.gz 

Source:

https://github.com/zendesk/maxwell 

來看下mysql的配置。

#開啟binlog

#修改my.cnf配置檔案 增加如下内容

[[email protected] /root]# vim /etc/my.cnf

[mysqld]

#binlog檔案儲存目錄及binlog檔案名字首

#binlog檔案儲存目錄: /var/lib/mysql/

#binlog檔案名字首: mysql-binlog

#mysql向檔案名字首添加數字字尾來按順序建立二進制日志檔案 如mysql-binlog.000006 mysql-binlog.000007

log-bin=/var/lib/mysql/mysql-binlog

#選擇基于行的日志記錄方式

binlog-format=ROW

#伺服器 id

#binlog資料中包含server_id,辨別該資料是由那個server同步過來的

server_id=1

這裡不要直接用,各位看客老爺可以稍微改改配置嘛,對吧。

CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY 'Ezhiyang2019!';

create database maxwell DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

GRANT ALL on maxwell.* to 'maxwell_sync'@'%';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%';

FLUSH PRIVILEGES;

mysql建表

create database test_maxwell;

use test_maxwell;

create table if not exists `user_info`(

   `userid` int,

   `name` varchar(100),

   `age` int

)engine=innodb default charset=utf8;

下載下傳解壓maxwell

[[email protected] /data/software]# wget https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

[[email protected] /data/software]# tar -zxvf maxwell-1.17.1.tar.gz
           

啟動maxwell

#輸入來源于mysql binlog 
#輸出到kafka
#配置說明
#1)kafka_topic 
#可配置成如 namespace_%{database}_%{table} %{database} 和 %{table}會被替換成真正的值
#2)kafka_version 
#注意和kafka版本比對。
#3)額外配置 
#kafka.acks、kafka.compression.type、kafka.retries
#4)filter
#可排除庫、表、過濾掉某些行。也可用一段js靈活處理資料 
#如 exclude: test_maxwell.user_info.userid = 1 排除test_maxwell庫user_info表userid值為1的行
#5)monitor
#可配置的監控方式jmx、http等
#http_bind_address 監控綁定的IP
#http_port 監控綁定的Port
#http_path_prefix http請求字首

[[email protected] /data/software/maxwell-1.17.1]# bin/maxwell \
--host='localhost' \
--port=3306 \
--user='maxwell_sync' \
--password='maxwell_sync_1' \
--filter='exclude: *.*,include:test_maxwell.user_info,exclude: test_maxwell.user_info.userid = 1' \
--producer=kafka \
--kafka_version='0.11.0.1' \
--kafka.bootstrap.servers='node1:6667,node2:6667,node3:6667' \
--kafka_topic=qaTopic \
--metrics_type=http \
--metrics_jvm=true \
--http_bind_address=node2 \
--http_port=8090 \
--http_path_prefix=db_test_maxwell

#輸出到控制台用如下配置

[[email protected] /data/software/maxwell-1.17.1]# bin/maxwell \
--host='localhost' \
--port=3306 \
--user='maxwell_sync' \
--password='maxwell_sync_1' \
--producer=stdout
           

是以,來操作。

建立主題

kafka-topics  --create --zookeeper pro-app-174:2181,pro-app-175:2181,pro-app-176:2181 --replication-factor 3 --partitions 1  --topic  housekeeper_realtime

kafka-topics --list --zookeeper pro-app-174:2181

bin/maxwell  --host='pro-app-174'  --port=3306  --user='root'  --password='Ezhiyang2019!'  --filter='exclude: *.*,include:maxwell.user_info,exclude:maxwell.user_info.userid = 1'  --producer=kafka  --kafka_version='4.0.0'  --kafka.bootstrap.servers='pro-app-174:9092,pro-app-175:9092,pro-app-176:9092'  --kafka_topic=housekeeper_realtime  --metrics_type=http  --metrics_jvm=true  --http_bind_address=pro-app-174  --http_port=8090  --http_path_prefix=db_maxwell

報錯版本太高了。

Error: No matches for kafka version: 4.0.0

控制台:

Error: No matches for kafka version: 4.0.0

Supported versions:

 - 0.10.0.1

 - 0.10.2.1

 - 0.11.0.1

 - 0.8.2.2

 - 0.9.0.1

 - 1.0.0

來看看解決方案

我們隻能身高maxwell的版本了。

來看看解決方案

https://github.com/zendesk/maxwell/releases

來這裡找一下高版本的maxwell,到了1.22了,不知道能不能用。

下載下傳解壓maxwell

tar -zxvf maxwell-

發現還是不行,要麼我的卡夫卡版本其實沒有那麼高。

還有個辦法,我很聰明的把kafka的配置改成了1,哈哈。果然可以了,來看一下日志。

[[email protected] maxwell-1.22.1]# bin/maxwell  --host='pro-app-174'  --port=3306  --user='root'  --password='Ezhiyang2019!'  --filte
r='exclude: *.*,include:maxwell.user_info,exclude:maxwell.user_info.userid = 1'  --producer=kafka  --kafka_version='1.0.0'  --kafka.bootstrap.servers='pro-app-174:9092,pro-app-175:9092,pro-app-176:9092'  --kafka_topic=housekeeper_realtime  --metrics_type=http  --metrics_jvm=true  --http_bind_address=pro-app-174  --http_port=8090  --http_path_prefix=db_maxwellUsing kafka version: 1.0.0
17:44:32,032 INFO  SchemaStoreSchema - Creating maxwell database
17:44:32,297 INFO  ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [pro-app-174:9092, pro-app-175:9092, pro-app-176:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

17:44:32,354 INFO  AppInfoParser - Kafka version : 1.0.0
17:44:32,354 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
17:44:32,431 INFO  Maxwell - Maxwell v1.22.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-binlog.0000
01:358319], lastHeartbeat=0]17:44:32,588 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
17:44:33,225 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000001:358319
17:44:33,229 INFO  MaxwellHTTPServer - Maxwell http server starting
17:44:33,232 INFO  MaxwellHTTPServer - Maxwell http server started on port 8090
17:44:33,288 INFO  log - Logging initialized @2078ms
17:44:33,345 INFO  Server - jetty-9.3.27.v20190418, build timestamp: 2019-04-19T02:11:38+08:00, git hash: d3e249f86955d04bc646bb62090
5b7c1bc596a8d17:44:33,368 INFO  BinaryLogClient - Connected to pro-app-174:3306 at mysql-binlog.000001/358319 (sid:6379, cid:1271)
17:44:33,368 INFO  BinlogConnectorLifecycleListener - Binlog connected.
17:44:33,385 INFO  ContextHandler - Started [email protected]{/db_maxwell,null,AVAILABLE}
17:44:33,398 INFO  AbstractConnector - Started [email protected]{HTTP/1.1,[http/1.1]}{pro-app-174:8090}
17:44:33,398 INFO  Server - Started @2189ms
           

到了這裡要監控kafka的消費者用戶端了。

kafka-console-consumer --bootstrap-server pro-app-174:2181,pro-app-175:2181,pro-app-176:2181 --from-beginning --topic housekeeper_realtime

來看下日志

[[email protected] maxwell-1.17.1]$ kafka-console-consumer --bootstrap-server pro-app-174:2181,pro-app-175:2181,pro-app-176:2
181 --from-beginning --topic housekeeper_realtime19/06/05 17:49:09 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
19/06/05 17:49:09 INFO consumer.ConsumerConfig: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [pro-app-174:2181, pro-app-175:2181, pro-app-176:2181]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = console-consumer-82209
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

19/06/05 17:49:09 INFO utils.AppInfoParser: Kafka version : 2.1.0-kafka-4.0.0
19/06/05 17:49:09 INFO utils.AppInfoParser: Kafka commitId : unknown
           

好的,看起來也沒有什麼東西啊,繼續往下走。插入資料試一下。

沒有反應,消費一下也沒有反映。

delete FROM maxwell.user_info;
insert into maxwell.user_info(userid,name,age) values (1,'name1',10),(2,'name2',20),(3,'name3',30);



update maxwell.user_info set name='name3',age=23 where userid=3;
           

插入資料報錯

“UPDATE user_course SET userid = 200 WHERE id = 28;”,結果報[Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains nonaggregated column 'information_schema.PROFILING.SEQ' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by
           

解決辦法:

SHOW VARIABLES LIKE '%sql_mode%';

set sql_mode = '';
set sql_mode = 'NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES';
SHOW VARIABLES LIKE '%sql_mode%';
           

然後依然是毫無反應啊。

無侵入式的mysql的binlog采集——maxwell采集binlog放到kafka中——成功!

檢視監控

http://pro-app-174:8090/db_maxwell/metrics

看不出來啥的,學習一下怎麼用,然後等下我們再來看一下,maxwell為什麼沒有發到kafka中吧。

maxwell健康狀态

http://node2:8090/db_test_maxwell/healthcheck

ping

http://node2:8090/db_test_maxwell/ping

Maxwell優缺點

優點

(1) 相比較canal,配置簡單,開箱即用。

(2) 可自定義發送目的地(java 繼承類,實作方法),資料處理靈活(js)。

(3) 自帶多種監控。

缺點

(1) 需要在待同步的業務庫上建schema_database庫(預設maxwell),用于存放中繼資料,如binlog消費偏移量。但按maxwell的意思,這并不是缺點。

(2) 不支援HA。而canal可通過zookeeper實作canal server和canal client的HA,實作failover。

好的,接下來看一下為什麼沒有打通了,首先先來調試kafka端,打開producer來建立生産者

生産者。

kafka-console-producer --broker-list pro-app-174:9092 --topic housekeeper_realtime

19/06/05 18:19:42 INFO utils.AppInfoParser: Kafka version : 2.1.0-kafka-4.0.0
19/06/05 18:19:42 INFO utils.AppInfoParser: Kafka commitId : unknown
>1
19/06/05 18:19:44 INFO clients.Metadata: Cluster ID: Nxyzks2RRUmrO0cRTrStwg
>2
>df
>wer
>asdf
>asdf
>sad
>fasd
f>sa
>dfsa
>df
>sadf
>sdf
>sd
>fsd
>fs
>df
>sadf
>sadf
>sf
>asdf
>sadf
>asdf
>asdf
>asdf
>sadf
>sadf
>sad
>fsad
>fsa
>dfsa
>f
>
           

消費者

kafka-console-consumer --bootstrap-server pro-app-174:9092 --topic housekeeper_realtime

up19/06/05 18:22:22 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Successfully joi
ned group with generation 119/06/05 18:22:22 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Setting newly as
signed partitions [housekeeper_realtime-0]19/06/05 18:22:22 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Resetting offset for partiti
on housekeeper_realtime-0 to offset 20.asdf
sadf
asdf
asdf
asdf
sadf
sadf
sad
fsad
fsa
dfsa
f
           

那就是maxwell這的問題了。

那我們繼續看哈,當我們使用stdout,也就是生産者就是指令行,而不是kafka的時候,來看看。

bin/maxwell --user='root' --password='44ang2019!' --host='pro-app-174' --producer=stdout

控制台有列印了。

Using kafka version: 1.0.0
10:30:01,482 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
10:30:01,767 INFO  Maxwell - Maxwell v1.22.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-binlog.000003:471
3815], lastHeartbeat=1559788182450]10:30:01,910 INFO  MysqlSavedSchema - Restoring schema id 3 (last modified at Position[BinlogPosition[mysql-binlog.000001:544743], la
stHeartbeat=1559728989336])10:30:02,061 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-binlog.000001:358319], la
stHeartbeat=0])10:30:02,110 INFO  MysqlSavedSchema - beginning to play deltas...
10:30:02,112 INFO  MysqlSavedSchema - played 2 deltas in 2ms
10:30:02,138 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000003:4713815
10:30:02,237 INFO  BinaryLogClient - Connected to pro-app-174:3306 at mysql-binlog.000003/4713815 (sid:6379, cid:1362)
10:30:02,237 INFO  BinlogConnectorLifecycleListener - Binlog connected.
10:35:50,403 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-binlog.000003:4741091], lastHeartbeat=15597885
 MODIFY COLUMN `userid`  int(12) NULL DEFAULT NULL FIRST" to test, new schema id is 410:36:26,530 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-binlog.000003:4744913], lastHeartbeat=15597885
 MODIFY COLUMN `userid`  int(13) NULL DEFAULT NULL FIRST" to test, new schema id is 5{"database":"ralph","table":"org","type":"insert","ts":1559788797,"xid":108943,"commit":true,"data":{"id":1,"uc_org_id":1,"org_name":
"1","org_type":1,"create_date":"2019-06-06 10:39:54","create_by":11,"update_date":"2019-06-06 10:39:50","update_by":1,"is_deleted":1}}
           

插入資料為。

無侵入式的mysql的binlog采集——maxwell采集binlog放到kafka中——成功!

好的,看來是沒有問題,可能隻是表有問題吧。

然後我們換成kafka消費;

bin/maxwell --user='root' --password='sdfang2019!' --host='pro-app-174' --producer=kafka --kafka.bootstrap.servers=pro-app-174:9092,pro-app-175:9092,pro-app-176:9092  --output_ddl=true --kafka_topic=housekeeper_realtime
           

來了!

19/06/05 18:22:22 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Resetting offset for partiti
on housekeeper_realtime-0 to offset 20.asdf
sadf
asdf
asdf
asdf
sadf
sadf
sad
fsad
fsa
dfsa
f

sdf
asdf
asdf
safd
{"database":"rawrh","table":"org","type":"insert","ts":1559789081,"xid":109663,"commit":true,"data":{"id":3,"uc_org_id":2,"org_name":
"2","org_type":2,"create_date":"2019-06-06 10:43:04","create_by":2,"update_date":"2019-06-06 10:43:08","update_by":2,"is_deleted":0}}{"database":"rasdfph","table":"org","type":"delete","ts":1559789100,"xid":109699,"commit":true,"data":{"id":1,"uc_org_id":1,"org_name":
"1","org_type":1,"create_date":"2019-06-06 10:39:54","create_by":11,"update_date":"2019-06-06 10:39:50","update_by":1,"is_deleted":1}}
           

可以看到,資料也已經來了。

無侵入式的mysql的binlog采集——maxwell采集binlog放到kafka中——成功!

很棒!

到此,maxwell到kafka到我們消費基本上成了。

接下來的任務,就是,考慮到update、delete的語句先後順序了,順序會導緻結果完全不一樣的。是以需要調研,還有一個業務就是,如何來分析這些日志,實時的mysql日志了,解析一下json就可以了,很棒,現在非常的棒棒。

繼續閱讀