調研過flume,目前采用datax,但是都是具有侵入式,即使再增量也會影響伺服器性能,詳細藐視可以檢視我以前的文章。
調研flume、cannal、outter、maxwell
最後無侵入式的,實時的。
主要差別:
1、雖然Maxwell不能直接支援HA,但是它支援斷點還原,即錯誤解決後重新開機繼續上次點兒讀取資料。
2、Canal是服務端,資料過來了并不能直接寫出去,需要一個用戶端:syncClient去擷取資料
Maxwell即是服務端也是用戶端。
3、Maxwell支援Bootstrap,即刷全量的資料,而Canal不支援。
4、Maxwell隻支援Json,而Canel資料格式自由
個人選擇Maxwell:
a、服務端和用戶端是一體的
b、Maxwell是輕量級的,出錯風險低,Canal經常出錯
c、雖然部署的是單台,但是具有斷點還原能力,出錯好解決
d、Maxwell代碼品質非常好,且社群更加的活躍
首先來看下下載下傳與安裝配置。
Download:
https://github.com/zendesk/maxwell/releases/download/v1.22.1/maxwell-1.22.1.tar.gz
Source:
https://github.com/zendesk/maxwell
來看下mysql的配置。
#開啟binlog
#修改my.cnf配置檔案 增加如下内容
[[email protected] /root]# vim /etc/my.cnf
[mysqld]
#binlog檔案儲存目錄及binlog檔案名字首
#binlog檔案儲存目錄: /var/lib/mysql/
#binlog檔案名字首: mysql-binlog
#mysql向檔案名字首添加數字字尾來按順序建立二進制日志檔案 如mysql-binlog.000006 mysql-binlog.000007
log-bin=/var/lib/mysql/mysql-binlog
#選擇基于行的日志記錄方式
binlog-format=ROW
#伺服器 id
#binlog資料中包含server_id,辨別該資料是由那個server同步過來的
server_id=1
這裡不要直接用,各位看客老爺可以稍微改改配置嘛,對吧。
CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY 'Ezhiyang2019!';
create database maxwell DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
GRANT ALL on maxwell.* to 'maxwell_sync'@'%';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%';
FLUSH PRIVILEGES;
mysql建表
create database test_maxwell;
use test_maxwell;
create table if not exists `user_info`(
`userid` int,
`name` varchar(100),
`age` int
)engine=innodb default charset=utf8;
下載下傳解壓maxwell
[[email protected] /data/software]# wget https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz
[[email protected] /data/software]# tar -zxvf maxwell-1.17.1.tar.gz
啟動maxwell
#輸入來源于mysql binlog
#輸出到kafka
#配置說明
#1)kafka_topic
#可配置成如 namespace_%{database}_%{table} %{database} 和 %{table}會被替換成真正的值
#2)kafka_version
#注意和kafka版本比對。
#3)額外配置
#kafka.acks、kafka.compression.type、kafka.retries
#4)filter
#可排除庫、表、過濾掉某些行。也可用一段js靈活處理資料
#如 exclude: test_maxwell.user_info.userid = 1 排除test_maxwell庫user_info表userid值為1的行
#5)monitor
#可配置的監控方式jmx、http等
#http_bind_address 監控綁定的IP
#http_port 監控綁定的Port
#http_path_prefix http請求字首
[[email protected] /data/software/maxwell-1.17.1]# bin/maxwell \
--host='localhost' \
--port=3306 \
--user='maxwell_sync' \
--password='maxwell_sync_1' \
--filter='exclude: *.*,include:test_maxwell.user_info,exclude: test_maxwell.user_info.userid = 1' \
--producer=kafka \
--kafka_version='0.11.0.1' \
--kafka.bootstrap.servers='node1:6667,node2:6667,node3:6667' \
--kafka_topic=qaTopic \
--metrics_type=http \
--metrics_jvm=true \
--http_bind_address=node2 \
--http_port=8090 \
--http_path_prefix=db_test_maxwell
#輸出到控制台用如下配置
[[email protected] /data/software/maxwell-1.17.1]# bin/maxwell \
--host='localhost' \
--port=3306 \
--user='maxwell_sync' \
--password='maxwell_sync_1' \
--producer=stdout
是以,來操作。
建立主題
kafka-topics --create --zookeeper pro-app-174:2181,pro-app-175:2181,pro-app-176:2181 --replication-factor 3 --partitions 1 --topic housekeeper_realtime
kafka-topics --list --zookeeper pro-app-174:2181
bin/maxwell --host='pro-app-174' --port=3306 --user='root' --password='Ezhiyang2019!' --filter='exclude: *.*,include:maxwell.user_info,exclude:maxwell.user_info.userid = 1' --producer=kafka --kafka_version='4.0.0' --kafka.bootstrap.servers='pro-app-174:9092,pro-app-175:9092,pro-app-176:9092' --kafka_topic=housekeeper_realtime --metrics_type=http --metrics_jvm=true --http_bind_address=pro-app-174 --http_port=8090 --http_path_prefix=db_maxwell
報錯版本太高了。
Error: No matches for kafka version: 4.0.0
控制台:
Error: No matches for kafka version: 4.0.0
Supported versions:
- 0.10.0.1
- 0.10.2.1
- 0.11.0.1
- 0.8.2.2
- 0.9.0.1
- 1.0.0
來看看解決方案
我們隻能身高maxwell的版本了。
來看看解決方案
https://github.com/zendesk/maxwell/releases
來這裡找一下高版本的maxwell,到了1.22了,不知道能不能用。
下載下傳解壓maxwell
tar -zxvf maxwell-
發現還是不行,要麼我的卡夫卡版本其實沒有那麼高。
還有個辦法,我很聰明的把kafka的配置改成了1,哈哈。果然可以了,來看一下日志。
[[email protected] maxwell-1.22.1]# bin/maxwell --host='pro-app-174' --port=3306 --user='root' --password='Ezhiyang2019!' --filte
r='exclude: *.*,include:maxwell.user_info,exclude:maxwell.user_info.userid = 1' --producer=kafka --kafka_version='1.0.0' --kafka.bootstrap.servers='pro-app-174:9092,pro-app-175:9092,pro-app-176:9092' --kafka_topic=housekeeper_realtime --metrics_type=http --metrics_jvm=true --http_bind_address=pro-app-174 --http_port=8090 --http_path_prefix=db_maxwellUsing kafka version: 1.0.0
17:44:32,032 INFO SchemaStoreSchema - Creating maxwell database
17:44:32,297 INFO ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [pro-app-174:9092, pro-app-175:9092, pro-app-176:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
17:44:32,354 INFO AppInfoParser - Kafka version : 1.0.0
17:44:32,354 INFO AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
17:44:32,431 INFO Maxwell - Maxwell v1.22.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-binlog.0000
01:358319], lastHeartbeat=0]17:44:32,588 INFO AbstractSchemaStore - Maxwell is capturing initial schema
17:44:33,225 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000001:358319
17:44:33,229 INFO MaxwellHTTPServer - Maxwell http server starting
17:44:33,232 INFO MaxwellHTTPServer - Maxwell http server started on port 8090
17:44:33,288 INFO log - Logging initialized @2078ms
17:44:33,345 INFO Server - jetty-9.3.27.v20190418, build timestamp: 2019-04-19T02:11:38+08:00, git hash: d3e249f86955d04bc646bb62090
5b7c1bc596a8d17:44:33,368 INFO BinaryLogClient - Connected to pro-app-174:3306 at mysql-binlog.000001/358319 (sid:6379, cid:1271)
17:44:33,368 INFO BinlogConnectorLifecycleListener - Binlog connected.
17:44:33,385 INFO ContextHandler - Started [email protected]{/db_maxwell,null,AVAILABLE}
17:44:33,398 INFO AbstractConnector - Started [email protected]{HTTP/1.1,[http/1.1]}{pro-app-174:8090}
17:44:33,398 INFO Server - Started @2189ms
到了這裡要監控kafka的消費者用戶端了。
kafka-console-consumer --bootstrap-server pro-app-174:2181,pro-app-175:2181,pro-app-176:2181 --from-beginning --topic housekeeper_realtime
來看下日志
[[email protected] maxwell-1.17.1]$ kafka-console-consumer --bootstrap-server pro-app-174:2181,pro-app-175:2181,pro-app-176:2
181 --from-beginning --topic housekeeper_realtime19/06/05 17:49:09 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
19/06/05 17:49:09 INFO consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [pro-app-174:2181, pro-app-175:2181, pro-app-176:2181]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-82209
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
19/06/05 17:49:09 INFO utils.AppInfoParser: Kafka version : 2.1.0-kafka-4.0.0
19/06/05 17:49:09 INFO utils.AppInfoParser: Kafka commitId : unknown
好的,看起來也沒有什麼東西啊,繼續往下走。插入資料試一下。
沒有反應,消費一下也沒有反映。
delete FROM maxwell.user_info;
insert into maxwell.user_info(userid,name,age) values (1,'name1',10),(2,'name2',20),(3,'name3',30);
update maxwell.user_info set name='name3',age=23 where userid=3;
插入資料報錯
“UPDATE user_course SET userid = 200 WHERE id = 28;”,結果報[Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains nonaggregated column 'information_schema.PROFILING.SEQ' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by
解決辦法:
SHOW VARIABLES LIKE '%sql_mode%';
set sql_mode = '';
set sql_mode = 'NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES';
SHOW VARIABLES LIKE '%sql_mode%';
然後依然是毫無反應啊。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL6FFRPl3a65keNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL0czMwUTNxgTM1AjNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
檢視監控
http://pro-app-174:8090/db_maxwell/metrics
看不出來啥的,學習一下怎麼用,然後等下我們再來看一下,maxwell為什麼沒有發到kafka中吧。
maxwell健康狀态
http://node2:8090/db_test_maxwell/healthcheck
ping
http://node2:8090/db_test_maxwell/ping
Maxwell優缺點
優點
(1) 相比較canal,配置簡單,開箱即用。
(2) 可自定義發送目的地(java 繼承類,實作方法),資料處理靈活(js)。
(3) 自帶多種監控。
缺點
(1) 需要在待同步的業務庫上建schema_database庫(預設maxwell),用于存放中繼資料,如binlog消費偏移量。但按maxwell的意思,這并不是缺點。
(2) 不支援HA。而canal可通過zookeeper實作canal server和canal client的HA,實作failover。
好的,接下來看一下為什麼沒有打通了,首先先來調試kafka端,打開producer來建立生産者
生産者。
kafka-console-producer --broker-list pro-app-174:9092 --topic housekeeper_realtime
19/06/05 18:19:42 INFO utils.AppInfoParser: Kafka version : 2.1.0-kafka-4.0.0
19/06/05 18:19:42 INFO utils.AppInfoParser: Kafka commitId : unknown
>1
19/06/05 18:19:44 INFO clients.Metadata: Cluster ID: Nxyzks2RRUmrO0cRTrStwg
>2
>df
>wer
>asdf
>asdf
>sad
>fasd
f>sa
>dfsa
>df
>sadf
>sdf
>sd
>fsd
>fs
>df
>sadf
>sadf
>sf
>asdf
>sadf
>asdf
>asdf
>asdf
>sadf
>sadf
>sad
>fsad
>fsa
>dfsa
>f
>
消費者
kafka-console-consumer --bootstrap-server pro-app-174:9092 --topic housekeeper_realtime
up19/06/05 18:22:22 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Successfully joi
ned group with generation 119/06/05 18:22:22 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Setting newly as
signed partitions [housekeeper_realtime-0]19/06/05 18:22:22 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Resetting offset for partiti
on housekeeper_realtime-0 to offset 20.asdf
sadf
asdf
asdf
asdf
sadf
sadf
sad
fsad
fsa
dfsa
f
那就是maxwell這的問題了。
那我們繼續看哈,當我們使用stdout,也就是生産者就是指令行,而不是kafka的時候,來看看。
bin/maxwell --user='root' --password='44ang2019!' --host='pro-app-174' --producer=stdout
控制台有列印了。
Using kafka version: 1.0.0
10:30:01,482 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
10:30:01,767 INFO Maxwell - Maxwell v1.22.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-binlog.000003:471
3815], lastHeartbeat=1559788182450]10:30:01,910 INFO MysqlSavedSchema - Restoring schema id 3 (last modified at Position[BinlogPosition[mysql-binlog.000001:544743], la
stHeartbeat=1559728989336])10:30:02,061 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-binlog.000001:358319], la
stHeartbeat=0])10:30:02,110 INFO MysqlSavedSchema - beginning to play deltas...
10:30:02,112 INFO MysqlSavedSchema - played 2 deltas in 2ms
10:30:02,138 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000003:4713815
10:30:02,237 INFO BinaryLogClient - Connected to pro-app-174:3306 at mysql-binlog.000003/4713815 (sid:6379, cid:1362)
10:30:02,237 INFO BinlogConnectorLifecycleListener - Binlog connected.
10:35:50,403 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-binlog.000003:4741091], lastHeartbeat=15597885
MODIFY COLUMN `userid` int(12) NULL DEFAULT NULL FIRST" to test, new schema id is 410:36:26,530 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-binlog.000003:4744913], lastHeartbeat=15597885
MODIFY COLUMN `userid` int(13) NULL DEFAULT NULL FIRST" to test, new schema id is 5{"database":"ralph","table":"org","type":"insert","ts":1559788797,"xid":108943,"commit":true,"data":{"id":1,"uc_org_id":1,"org_name":
"1","org_type":1,"create_date":"2019-06-06 10:39:54","create_by":11,"update_date":"2019-06-06 10:39:50","update_by":1,"is_deleted":1}}
插入資料為。
好的,看來是沒有問題,可能隻是表有問題吧。
然後我們換成kafka消費;
bin/maxwell --user='root' --password='sdfang2019!' --host='pro-app-174' --producer=kafka --kafka.bootstrap.servers=pro-app-174:9092,pro-app-175:9092,pro-app-176:9092 --output_ddl=true --kafka_topic=housekeeper_realtime
來了!
19/06/05 18:22:22 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Resetting offset for partiti
on housekeeper_realtime-0 to offset 20.asdf
sadf
asdf
asdf
asdf
sadf
sadf
sad
fsad
fsa
dfsa
f
sdf
asdf
asdf
safd
{"database":"rawrh","table":"org","type":"insert","ts":1559789081,"xid":109663,"commit":true,"data":{"id":3,"uc_org_id":2,"org_name":
"2","org_type":2,"create_date":"2019-06-06 10:43:04","create_by":2,"update_date":"2019-06-06 10:43:08","update_by":2,"is_deleted":0}}{"database":"rasdfph","table":"org","type":"delete","ts":1559789100,"xid":109699,"commit":true,"data":{"id":1,"uc_org_id":1,"org_name":
"1","org_type":1,"create_date":"2019-06-06 10:39:54","create_by":11,"update_date":"2019-06-06 10:39:50","update_by":1,"is_deleted":1}}
可以看到,資料也已經來了。
很棒!
到此,maxwell到kafka到我們消費基本上成了。
接下來的任務,就是,考慮到update、delete的語句先後順序了,順序會導緻結果完全不一樣的。是以需要調研,還有一個業務就是,如何來分析這些日志,實時的mysql日志了,解析一下json就可以了,很棒,現在非常的棒棒。