天天看點

自建Binlog訂閱服務 —— Maxwell

1. 介紹

Maxwell 是java語言編寫的能夠讀取、解析MySQL binlog,将行更新以json格式發送到 Kafka、RabbitMQ、AWS Kinesis、Google Cloud Pub/Sub、檔案,有了增量的資料流,可以想象的應用場景實在太多了,如ETL、維護緩存、收集表級别的dml名額、增量到搜尋引擎、資料分區遷移、切庫binlog復原方案,等等。

它還提供其它功能:

  • 支援

    SELECT * FROM table

     的方式做全量資料初始化
  • 支援主庫發生failover後,自動恢複binlog位置(GTID)
  • 靈活的對資料進行分區,解決資料傾斜的問題。kafka支援 database, table, column等級别的資料分區
  • 它的實作方式是僞裝成MySQL Server的從庫,接收binlog events,然後根據schemas資訊拼裝,支援ddl,xid,rows等各種event.

maxwell由 zendesk 開源:https://github.com/zendesk/maxwell ,而且維護者相當活躍。

網上已有人對 Alibaba Cannal, Zendesk Maxwell, Yelp mysql_streamer進行對比,見文後參考 實時抓取MySQL的更新資料到Hadoop。

類似功能的還有:http://debezium.io/docs/connectors/mysql/

安裝

使用 maxwell 非常簡單,隻需要jdk環境

yum install -y java-1.8.0-openjdk-1.8.0.121-1.b13.el6.x86_64

curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.12.0/maxwell-1.12.0.tar.gz \
       | tar zxvf -
cd maxwell-1.12.0

# 預設尋找目前目錄下的 config.properties 配置檔案
           

要求 mysql server binlog格式是 

ROW

, row_image 是 

FULL

。感受一下輸出結果

mysql> update test.e set m = 5.444, c = now(3) where id = 1;
{
   "database":"test",
   "table":"e",
   "type":"update",
   "ts":1477053234,
   "commit": true,
   ...
   "data":{
      "id":1,
      "m":5.444,
      "c":"2016-10-21 05:33:54.631000",
      "comment":"I am a creature of light."
   },
   "old":{
      "m":4.2341,
      "c":"2016-10-21 05:33:37.523000"
   }
}

mysql> create table test.e ( ... )
{
   "type":"table-create",
   "database":"test",
   "table":"e",
   "def":{
      "database":"test",
      "charset":"utf8mb4",
      "table":"e",
      "columns":[
         {"type":"int", "name":"id", "signed":true},
         {"type":"double", "name":"m"},
         {"type":"timestamp", "name":"c", "column-length":6},
         {"type":"varchar", "name":"comment", "charset":"latin1"}
      ],
      "primary-key":[
         "id"
      ]
   },
   "ts":1477053126000,
   "sql":"create table test.e ( id int(10) not null primary key auto_increment, m double, c timestamp(6), comment varchar(255) charset 'latin1' )",
   "position":"master.000006:800050"
}
           

data

是 After image, 

old

 是 Before image。 insert 隻有後鏡像,delete隻有前鏡像(

data

type

是語句類型:

insert

update

delete

database-create

database-alter

database-drop

table-create

table-alter

table-drop

 。

基本配置

config.properties 配置檔案裡面的所有選項,都可以在啟動 maxweill 

./bin/maxwell

 是指定,覆寫配置檔案的内容。這裡隻講一些常用的。

mysql options

  • host

    指定從哪個位址的mysql擷取binlog

  • replication_host

    如果指定了 

    replication_host

    ,那麼它是真正的binlog來源的mysql server位址,而那麼上面的

    host

    用于存放maxwell表結構和binlog位置的位址。

    将兩者分開,可以避免 replication_user 往生産庫裡寫資料。

  • schema_host

    從哪個host擷取表結構。binlog裡面沒有字段資訊,是以maxwell需要從資料庫查出schema,存起來。

    schema_host一般用不到,但在binlog-proxy場景下就很實用。比如要将已經離線的binlog通過maxwell生成json流,于是自建一個mysql server裡面沒有結構,隻用于發送binlog,此時表機構就可以制動從 

    schema_host

     擷取。
  • gtid_mode

    如果 mysql server 啟用了GTID,maxwell也可以基于gtid取event。如果mysql server發生failover,maxwell不需要手動指定newfile:postion

正常情況下,replication_host 和 schema_host都不需要指定,隻有一個 

--host

  • schema_database

    使用這個db來存放 maxwell 需要的表,比如要複制的databases, tables, columns, postions, heartbeats.

filtering

  • include_dbs

    隻發送binlog裡面這些databases的變更,以

    ,

    号分隔,中間不要包含空格。

    也支援java風格的正則,如 

    include_tables=db1,/db\\d+/

    ,表示 db1, db2, db3…這樣的。(下面的filter都支援這種regex)

    提示:這裡的dbs指定的是真實db。比如binlog裡面可能 

    use db1

     但 

    update db2.ttt

    ,那麼maxwell生成的json 

    database

     内容是db2。
  • exclude_dbs

    排除指定的這些 databbases

  • include_tables

    隻發送這些表的資料變更。不隻需要指定 database.

  • exclude_tables

    排除指定的這些表

  • exclude_columns

    不輸出這些字段。如果字段名在row中不存在,則忽略這個filter。

  • include_column_values

    1.12.0新引入的過濾項。隻輸出滿足 column=values 的行,比如 

    include_column_values=bar=x,foo=y

    ,如果有

    bar

    字段,那麼隻輸出值為

    x

    的行,如果有

    foo

    字段,那麼隻輸出值為

    y

    的行。

    如果沒有對應字段,如隻有

    bar=x

    沒有

    foo

    字段,那麼也成立。(即不是 或,也不是 與)
  • blacklist_dbs

    一般不用。

    blacklist_dbs

    字面上難以與

    exclude_dbs

     分開,官網的說明也是模棱兩可。

    從代碼裡面看出的意思是,屏蔽指定的這些dbs,tables的結構變更,與行變更過濾,沒有關系。它應對的場景是,某個表上頻繁的有ddl,比如truncate。

因為往往我們隻需要觀察部分表的變更,是以要注意這些 include 與 exclude 的關系,記住三點:

  1. 隻要 include 有值,那麼不在include裡面的都排除
  2. 隻要在 exclude 裡面的,都排除
  3. 其它都正常輸出
  4. exclude的過濾級别優先于include

舉個比較極端的例子:

# database: db1,db2,db3,mydb
① include_dbs=db1,/db\\d+/
② exclude_dbs=db2
③ inlcude_tables=t1,t2,t3
④ exclude_tables=t3
           

配置了 include_dbs,那麼mydb不在裡面,是以排除;

配置了 exclude_dbs,那麼db2排除。剩下db1,db3

同樣對 tables,剩下t1,t2

是以db1.t1, db1.t2, db3.t1, db3.t2是篩選後剩下可輸出的。如果沒有指定include_dbs,那麼mydb.t1也可以輸出。

formatting

  • output_ddl

    是否在輸出的json流中,包含ddl語句。預設 false

  • output_binlog_position

    是否在輸出的json流中,包含binlog filename:postion。預設 false

  • output_commit_info

    是否在輸出的json流裡面,包含 commit 和 xid 資訊。預設 true

    比如一個事物裡,包含多個表的變更,或一個表上多條資料的變更,那麼他們都具有相同的 xid,最後一個row event輸出 commit:true 字段。這有利于消費者實作 事務回放,而不僅僅是行級别的回放。

  • output_thread_id

    同樣,binlog裡面也包含了 thread_id ,可以包含在輸出中。預設 false

    消費者可以用它來實作更粗粒度的事務回放。還有一個場景是使用者審計,使用者每次登陸之後将登陸ip、登陸時間、使用者名、thread_id記錄到一個表中,可輕松根據thread_id關聯到binlog裡面這條記錄是哪個使用者修改的。

monitoring

如果是長時間運作的maxwell,添加monitor配置,maxwell提供了http api傳回監控資料。

其它

  • init_position

    手動指定maxwell要從哪個binlog,哪個位置開始。指定的格式

    FILE:POSITION:HEARTBEAT

    。隻支援在啟動maxwell的指令指定,比如 

    --init_postion=mysql-bin.0000456:4:0

    maxwell 預設從連接配接上mysql server的目前位置開始解析,如果指定 init_postion,要確定檔案确實存在,如果binlog已經被purge掉了,可能需要想其它辦法。見 Binlog可視化搜尋:實作類似阿裡RDS資料追蹤功能

2. 選擇合适的生産者

Maxwell是将binlog解析成json這種比較通用的格式,那麼要去用它可以選擇輸出到哪裡,比如Kafka, rabbitmq, file等,總之送到消息隊列裡去。每種 Producer 有自己對應的選項。

2.1 file

producer=file
output_file=/tmp/mysql_binlog_data.log
           

比較簡單,直接指定輸出到哪個檔案

output_file

。有什麼日志收集系統,可以直接從這裡拿。

2.2 rabbitmq

rabbitmq 是非常流行的一個AMQP協定的消息隊列服務,相關介紹請參考 rabbitmq入門

producer=rabbitmq

rabbitmq_host=10.81.xx.xxx
rabbitmq_user=admin
rabbitmq_pass=admin
rabbitmq_virtual_host=/some0
rabbitmq_exchange=maxwell.some
rabbitmq_exchange_type=topic
rabbitmq_exchange_durable=true
rabbitmq_exchange_autodelete=false
rabbitmq_routing_key_template=%db%.%table%
           

上面的參數都很容易了解,1.12.0版本新加入

rabbitmq_message_persistent

控制釋出消息持久化的參數。

rabbitmq_routing_key_template

是按照 db.tbl 的格式指定 routing_key,在建立隊列時,可以根據不同的表進入不同的隊列,提高并行消費而不亂序的能力。

因為rabbitmq搭建起來非常簡單,是以我習慣用這個。

2.3 kafka

kafka是maxwell支援最完善的一個producer,并且内置了 多個版本的 kafka client(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1),預設 

kafka_version=0.11.0.1

producer=kafka

# 指定kafka brokers 位址
kafka.bootstrap.servers=hosta:9092,hostb:9092

# kafka主題可以是固定的,可以是 `maxwell_%{database}_%{table}` 這種按表去自動建立的動态topic
kafka_topic=maxwell

# ddl單獨使用的topic
ddl_kafka_topic=maxwell_ddl

# kafka和kenesis都支援分區,可以選擇根據 database, table, primary_key, 或者column的值去做partition
# maxwell預設使用database,在啟動的時候會去檢查是否topic是否有足夠多數量的partitions,是以要提前建立好
#  bin/kafka-topics.sh --zookeeper ZK_HOST:2181 --create \
#                      --topic maxwell --partitions 20 --replication-factor 2
producer_partition_by=database

# 如果指定了 producer_partition_by=column, 就需要指定下面兩個參數
# 根據user_id,create_date兩列的值去分區,partition_key形如 1178532016-10-10 18:29:04
producer_partition_columns=user_id,create_date
# 如果不存在user_id或create_date,則按照database分區:
producer_partition_by_fallback=database
           

maxwell會讀取

kafka.

開頭的參數,設定到連接配接參數裡,比如

kafka.acks=1

,

kafka.retries=3

2.4 redis

redis也有簡單的釋出訂閱(

pub/sub

)功能

producer=redis

redis_host=10.47.xx.xxx
redis_port=6379
# redis_auth=redis_auth
redis_database=0
redis_pub_channel=maxwell
           

但是試用一番之後,發現如果訂閱沒有連上去的話,所有pub的消息是會丢失的。是以最好使用

push/pop

去實作。

3. 注意事項

下面的是在使用過程中遇到的一些小問題,做下總結。

timestamp column

maxwell對時間類型(datetime, timestamp, date)都是當做字元串處理的,這也是為了保證資料一緻(比如

0000-00-00 00:00:00

這樣的時間在timestamp裡是非法的,但mysql卻認,解析成java或者python類型就是null/None)。

如果MySQL表上的字段是 timestamp 類型,是有時區的概念,binlog解析出來的是标準UTC時間,但使用者看到的是本地時間。比如 

f_create_time timestamp

 建立時間是中原標準時間

2018-01-05 21:01:01

,那麼mysql實際存儲的是

2018-01-05 13:01:01

,binlog裡面也是這個時間字元串。如果不做消費者不做時區轉換,會少8個小時。被這個狠狠坑了一把。

與其每個用戶端都要考慮這個問題,我覺得更合理的做法是提供時區參數,然後maxwell自動處理時區問題,否則要麼用戶端先需要知道哪些列是timestamp類型,或者連接配接上原庫緩存上這些類型。

binary column

maxwell可以處理binary類型的列,如

blob

varbinary

,它的做法就是對二進制列使用 base64_encode,當做字元串輸出到json。消費者拿到這個列資料後,不能直接拼裝,需要 base64_decode。

表結構不同步

如果是拿比較老的binlog,放到新的mysql server上去用maxwell拉去,有可能表結構已經發生了變化,比如binlog裡面字段比 schema_host 裡面的字段多一個。目前這種情況沒有發現異常,比如阿裡RDS預設會為 無主鍵無唯一索引的表,增加一個

__##alibaba_rds_rowid##__

,在 show create table 和 schema裡面都看不到這個隐藏主鍵,但binlog裡面會有,同步到從庫。

另外我們有通過git去管理結構版本,如果真有這種場景,也可以應對。

大事務binlog

當一個事物産生的binlog量非常大的時候,比如遷移日表資料,maxwell為了控制記憶體使用,會自動将處理不過來的binlog放到檔案系統

Using kafka version: 0.11.0.1
21:16:07,109 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO  SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO  Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO  BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO  BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events
           

但是遇到另外一個問題,overflow随後就出現異常

EventDataDeserializationException: Failed to deserialize data of EventHeaderV4

,當我另起一個maxwell指點之前的binlog postion開始解析,卻有沒有抛異常。事後的資料也表明并沒有資料丢失。

問題産生的原因還不明,Caused by: java.net.SocketException: Connection reset,感覺像讀取 binlog 流的時候還沒讀取到完整的event,異常關閉了連接配接。這個問題比較頑固,github上面類似問題都沒有達到明确的解決。(這也從側面告訴我們,大表資料遷移,也要批量進行,不要一個insert into .. select 搞定)

03:18:20,586 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN  BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
        at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
        at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        ... 5 more
03:19:31,514 INFO  BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN  BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO  BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)
           

tableMapCache

前面講過,如果我隻想擷取某幾個表的binlog變更,需要用 

include_tables

 來過濾,但如果mysql server上現在删了一個表t1,但我的binlog是從昨天開始讀取,被删的那個表t1在maxwell啟動的時候是拉取不到表結構的。然後昨天的binlog裡面有 t1 的變更,因為找不到表結構給來組裝成json,會抛異常。

手動在 maxwell.tables/columns 裡面插入記錄是可行的。但這個問題的根本是,maxwell在binlog過濾的時候,隻在處理row_event的時候,而對 tableMapCache 要求binlog裡面的所有表都要有。

自己送出了一個commit,可以在做 tableMapCache 的時候也僅要求緩存 include_dbs/tables 這些表: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb

提高消費性能

再用rabbitmq時,routing_key 是 

%db%.%table%

,但某些表産生的binlog增量非常大,就會導緻各隊列消息量很不平均,目前因為還沒做到事務xid或者thread_id級别的并發回放,是以最小隊列粒度也是表,盡量單獨放一個隊列,其它資料量小的合在一起。

參考

  • http://maxwells-daemon.io/config/
  • 實時抓取MySQL的更新資料到Hadoop
  • MySQL CDC, Streaming Binary Logs and Asynchronous Triggers

繼續閱讀