1. 介紹
Maxwell 是java語言編寫的能夠讀取、解析MySQL binlog,将行更新以json格式發送到 Kafka、RabbitMQ、AWS Kinesis、Google Cloud Pub/Sub、檔案,有了增量的資料流,可以想象的應用場景實在太多了,如ETL、維護緩存、收集表級别的dml名額、增量到搜尋引擎、資料分區遷移、切庫binlog復原方案,等等。
它還提供其它功能:
- 支援
的方式做全量資料初始化SELECT * FROM table
- 支援主庫發生failover後,自動恢複binlog位置(GTID)
- 靈活的對資料進行分區,解決資料傾斜的問題。kafka支援 database, table, column等級别的資料分區
- 它的實作方式是僞裝成MySQL Server的從庫,接收binlog events,然後根據schemas資訊拼裝,支援ddl,xid,rows等各種event.
maxwell由 zendesk 開源:https://github.com/zendesk/maxwell ,而且維護者相當活躍。
網上已有人對 Alibaba Cannal, Zendesk Maxwell, Yelp mysql_streamer進行對比,見文後參考 實時抓取MySQL的更新資料到Hadoop。
類似功能的還有:http://debezium.io/docs/connectors/mysql/
安裝
使用 maxwell 非常簡單,隻需要jdk環境
|
要求 mysql server binlog格式是
ROW
, row_image 是
FULL
。感受一下輸出結果
|
data
是 After image,
old
是 Before image。 insert 隻有後鏡像,delete隻有前鏡像(
data
)
type
是語句類型:
insert
,
update
,
delete
,
database-create
,
database-alter
,
database-drop
,
table-create
,
table-alter
,
table-drop
。
基本配置
config.properties 配置檔案裡面的所有選項,都可以在啟動 maxweill
./bin/maxwell
是指定,覆寫配置檔案的内容。這裡隻講一些常用的。
mysql options
-
host
指定從哪個位址的mysql擷取binlog
-
replication_host
如果指定了
,那麼它是真正的binlog來源的mysql server位址,而那麼上面的replication_host
host
用于存放maxwell表結構和binlog位置的位址。
将兩者分開,可以避免 replication_user 往生産庫裡寫資料。
-
schema_host
從哪個host擷取表結構。binlog裡面沒有字段資訊,是以maxwell需要從資料庫查出schema,存起來。
schema_host一般用不到,但在binlog-proxy場景下就很實用。比如要将已經離線的binlog通過maxwell生成json流,于是自建一個mysql server裡面沒有結構,隻用于發送binlog,此時表機構就可以制動從
擷取。schema_host
-
gtid_mode
如果 mysql server 啟用了GTID,maxwell也可以基于gtid取event。如果mysql server發生failover,maxwell不需要手動指定newfile:postion
正常情況下,replication_host 和 schema_host都不需要指定,隻有一個
--host
。
-
schema_database
使用這個db來存放 maxwell 需要的表,比如要複制的databases, tables, columns, postions, heartbeats.
filtering
-
include_dbs
隻發送binlog裡面這些databases的變更,以
,
号分隔,中間不要包含空格。
也支援java風格的正則,如
include_tables=db1,/db\\d+/
,表示 db1, db2, db3…這樣的。(下面的filter都支援這種regex)
提示:這裡的dbs指定的是真實db。比如binlog裡面可能
但use db1
,那麼maxwell生成的jsonupdate db2.ttt
内容是db2。database
-
exclude_dbs
排除指定的這些 databbases
-
include_tables
隻發送這些表的資料變更。不隻需要指定 database.
-
exclude_tables
排除指定的這些表
-
exclude_columns
不輸出這些字段。如果字段名在row中不存在,則忽略這個filter。
-
include_column_values
1.12.0新引入的過濾項。隻輸出滿足 column=values 的行,比如
,如果有include_column_values=bar=x,foo=y
字段,那麼隻輸出值為bar
的行,如果有x
字段,那麼隻輸出值為foo
y
的行。
如果沒有對應字段,如隻有
沒有bar=x
字段,那麼也成立。(即不是 或,也不是 與)foo
-
blacklist_dbs
一般不用。
字面上難以與blacklist_dbs
exclude_dbs
分開,官網的說明也是模棱兩可。
從代碼裡面看出的意思是,屏蔽指定的這些dbs,tables的結構變更,與行變更過濾,沒有關系。它應對的場景是,某個表上頻繁的有ddl,比如truncate。
因為往往我們隻需要觀察部分表的變更,是以要注意這些 include 與 exclude 的關系,記住三點:
- 隻要 include 有值,那麼不在include裡面的都排除
- 隻要在 exclude 裡面的,都排除
- 其它都正常輸出
- exclude的過濾級别優先于include
舉個比較極端的例子:
|
配置了 include_dbs,那麼mydb不在裡面,是以排除;
配置了 exclude_dbs,那麼db2排除。剩下db1,db3
同樣對 tables,剩下t1,t2
是以db1.t1, db1.t2, db3.t1, db3.t2是篩選後剩下可輸出的。如果沒有指定include_dbs,那麼mydb.t1也可以輸出。
formatting
-
output_ddl
是否在輸出的json流中,包含ddl語句。預設 false
-
output_binlog_position
是否在輸出的json流中,包含binlog filename:postion。預設 false
-
output_commit_info
是否在輸出的json流裡面,包含 commit 和 xid 資訊。預設 true
比如一個事物裡,包含多個表的變更,或一個表上多條資料的變更,那麼他們都具有相同的 xid,最後一個row event輸出 commit:true 字段。這有利于消費者實作 事務回放,而不僅僅是行級别的回放。
-
output_thread_id
同樣,binlog裡面也包含了 thread_id ,可以包含在輸出中。預設 false
消費者可以用它來實作更粗粒度的事務回放。還有一個場景是使用者審計,使用者每次登陸之後将登陸ip、登陸時間、使用者名、thread_id記錄到一個表中,可輕松根據thread_id關聯到binlog裡面這條記錄是哪個使用者修改的。
monitoring
如果是長時間運作的maxwell,添加monitor配置,maxwell提供了http api傳回監控資料。
其它
-
init_position
手動指定maxwell要從哪個binlog,哪個位置開始。指定的格式
。隻支援在啟動maxwell的指令指定,比如FILE:POSITION:HEARTBEAT
--init_postion=mysql-bin.0000456:4:0
。
maxwell 預設從連接配接上mysql server的目前位置開始解析,如果指定 init_postion,要確定檔案确實存在,如果binlog已經被purge掉了,可能需要想其它辦法。見 Binlog可視化搜尋:實作類似阿裡RDS資料追蹤功能
2. 選擇合适的生産者
Maxwell是将binlog解析成json這種比較通用的格式,那麼要去用它可以選擇輸出到哪裡,比如Kafka, rabbitmq, file等,總之送到消息隊列裡去。每種 Producer 有自己對應的選項。
2.1 file
|
比較簡單,直接指定輸出到哪個檔案
output_file
。有什麼日志收集系統,可以直接從這裡拿。
2.2 rabbitmq
rabbitmq 是非常流行的一個AMQP協定的消息隊列服務,相關介紹請參考 rabbitmq入門
|
上面的參數都很容易了解,1.12.0版本新加入
rabbitmq_message_persistent
控制釋出消息持久化的參數。
rabbitmq_routing_key_template
是按照 db.tbl 的格式指定 routing_key,在建立隊列時,可以根據不同的表進入不同的隊列,提高并行消費而不亂序的能力。
因為rabbitmq搭建起來非常簡單,是以我習慣用這個。
2.3 kafka
kafka是maxwell支援最完善的一個producer,并且内置了 多個版本的 kafka client(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1),預設
kafka_version=0.11.0.1
|
maxwell會讀取
kafka.
開頭的參數,設定到連接配接參數裡,比如
kafka.acks=1
,
kafka.retries=3
等
2.4 redis
redis也有簡單的釋出訂閱(
pub/sub
)功能
|
但是試用一番之後,發現如果訂閱沒有連上去的話,所有pub的消息是會丢失的。是以最好使用
push/pop
去實作。
3. 注意事項
下面的是在使用過程中遇到的一些小問題,做下總結。
timestamp column
maxwell對時間類型(datetime, timestamp, date)都是當做字元串處理的,這也是為了保證資料一緻(比如
0000-00-00 00:00:00
這樣的時間在timestamp裡是非法的,但mysql卻認,解析成java或者python類型就是null/None)。
如果MySQL表上的字段是 timestamp 類型,是有時區的概念,binlog解析出來的是标準UTC時間,但使用者看到的是本地時間。比如
f_create_time timestamp
建立時間是中原標準時間
2018-01-05 21:01:01
,那麼mysql實際存儲的是
2018-01-05 13:01:01
,binlog裡面也是這個時間字元串。如果不做消費者不做時區轉換,會少8個小時。被這個狠狠坑了一把。
與其每個用戶端都要考慮這個問題,我覺得更合理的做法是提供時區參數,然後maxwell自動處理時區問題,否則要麼用戶端先需要知道哪些列是timestamp類型,或者連接配接上原庫緩存上這些類型。
binary column
maxwell可以處理binary類型的列,如
blob
、
varbinary
,它的做法就是對二進制列使用 base64_encode,當做字元串輸出到json。消費者拿到這個列資料後,不能直接拼裝,需要 base64_decode。
表結構不同步
如果是拿比較老的binlog,放到新的mysql server上去用maxwell拉去,有可能表結構已經發生了變化,比如binlog裡面字段比 schema_host 裡面的字段多一個。目前這種情況沒有發現異常,比如阿裡RDS預設會為 無主鍵無唯一索引的表,增加一個
__##alibaba_rds_rowid##__
,在 show create table 和 schema裡面都看不到這個隐藏主鍵,但binlog裡面會有,同步到從庫。
另外我們有通過git去管理結構版本,如果真有這種場景,也可以應對。
大事務binlog
當一個事物産生的binlog量非常大的時候,比如遷移日表資料,maxwell為了控制記憶體使用,會自動将處理不過來的binlog放到檔案系統
|
但是遇到另外一個問題,overflow随後就出現異常
EventDataDeserializationException: Failed to deserialize data of EventHeaderV4
,當我另起一個maxwell指點之前的binlog postion開始解析,卻有沒有抛異常。事後的資料也表明并沒有資料丢失。
問題産生的原因還不明,Caused by: java.net.SocketException: Connection reset,感覺像讀取 binlog 流的時候還沒讀取到完整的event,異常關閉了連接配接。這個問題比較頑固,github上面類似問題都沒有達到明确的解決。(這也從側面告訴我們,大表資料遷移,也要批量進行,不要一個insert into .. select 搞定)
|
tableMapCache
前面講過,如果我隻想擷取某幾個表的binlog變更,需要用
include_tables
來過濾,但如果mysql server上現在删了一個表t1,但我的binlog是從昨天開始讀取,被删的那個表t1在maxwell啟動的時候是拉取不到表結構的。然後昨天的binlog裡面有 t1 的變更,因為找不到表結構給來組裝成json,會抛異常。
手動在 maxwell.tables/columns 裡面插入記錄是可行的。但這個問題的根本是,maxwell在binlog過濾的時候,隻在處理row_event的時候,而對 tableMapCache 要求binlog裡面的所有表都要有。
自己送出了一個commit,可以在做 tableMapCache 的時候也僅要求緩存 include_dbs/tables 這些表: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb
提高消費性能
再用rabbitmq時,routing_key 是
%db%.%table%
,但某些表産生的binlog增量非常大,就會導緻各隊列消息量很不平均,目前因為還沒做到事務xid或者thread_id級别的并發回放,是以最小隊列粒度也是表,盡量單獨放一個隊列,其它資料量小的合在一起。
參考
- http://maxwells-daemon.io/config/
- 實時抓取MySQL的更新資料到Hadoop
- MySQL CDC, Streaming Binary Logs and Asynchronous Triggers