1. 背景
由于公司業務線的不斷拓展,建立了很多MySQL執行個體,為了安全起見每個執行個體之間不能直接互相通路,但是業務部門又需要整合各個業務線的資料進行分析、制定風控政策等。是以需要将不同業務線資料進行歸集。
當然一下方案不是最優的,MySQL執行個體之間資料互通,有很多成熟且穩定的方式,是以我覺得我們選擇了一種不是非常理想的方式!
2. 處理流程
MySQL ——> 産生binlog日志 ——> Maxwell解析成json格式 ——> 發送到Kafka ——> 通過Spark消費 ——> 寫入MySQL
3. 處理思路
由于需要同步的表非常多,是以通過JavaBean反射的方式去解析Maxwell發送到kafka的資料工作量較繁重。并且對于不斷變更表結構的業務來講,也不是一個非常合理的方式。
Maxwell針對不同的MySQL操作方式會産生不同結構的Json資料:
mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
maxwell: {
"database": "test",
"table": "maxwell",
"type": "insert",
"ts": 1449786310,
"xid": 940752,
"commit": true,
"data": { "id":1, "daemon": "Stanislaw Lem" }
}
mysql> update test.maxwell set daemon = 'firebus! firebus!' where id = 1;
maxwell: {
"database": "test",
"table": "maxwell",
"type": "update",
"ts": 1449786341,
"xid": 940786,
"commit": true,
"data": {"id":1, "daemon": "Firebus! Firebus!"},
"old": {"daemon": "Stanislaw Lem"}
}
經過分析我們可以解析出相應的SQL語句,然後通過JDBC的方式操作MySQL,實作跨庫的資料同步需求。
4、遇到問題
如果一條資料中存在為NULL值的字段,Maxwell不會将該字段通過Json的方式發送過來,是以Json中不完全包含一條完整的資料,該字段在操作MySQL時隻能将其插入對應字段的預設值,但不影響資料的準确性。
當一條資料中某個字段由NULL變更為非NULL值時,我們試圖通過解析old串中被更新的字段時是徒勞的,因為此時old串為空,此時會報:java.lang.ArrayIndexOutOfBoundsException: -1(數組越界異常)。
當一條資料中某個字段由非NULL變更為NULL值時,在data串中你也是無法擷取到該字段的,試圖解析會出現:java.lang.NullPointerException(空指針異常)。
解決上述兩種問題的方式是對update語句分情況解析。如果被變更狀态的字段都出現在了data串中,那麼可以将data串作為最新的資料更新到MySQL中。否則通過比較data和old串的不同,還原相應字段的最新值,進而更新MySQL,最終問題得到解決。