天天看點

Spark解析binlog日志,寫入MySQL

1. 背景

  由于公司業務線的不斷拓展,建立了很多MySQL執行個體,為了安全起見每個執行個體之間不能直接互相通路,但是業務部門又需要整合各個業務線的資料進行分析、制定風控政策等。是以需要将不同業務線資料進行歸集。

  當然一下方案不是最優的,MySQL執行個體之間資料互通,有很多成熟且穩定的方式,是以我覺得我們選擇了一種不是非常理想的方式!

2.  處理流程

    MySQL ——> 産生binlog日志 ——> Maxwell解析成json格式 ——> 發送到Kafka ——> 通過Spark消費 ——> 寫入MySQL

3.  處理思路

  由于需要同步的表非常多,是以通過JavaBean反射的方式去解析Maxwell發送到kafka的資料工作量較繁重。并且對于不斷變更表結構的業務來講,也不是一個非常合理的方式。

  Maxwell針對不同的MySQL操作方式會産生不同結構的Json資料:

mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "xid": 940752,
    "commit": true,
    "data": { "id":1, "daemon": "Stanislaw Lem" }
  }
           
mysql> update test.maxwell set daemon = 'firebus!  firebus!' where id = 1;
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786341,
    "xid": 940786,
    "commit": true,
    "data": {"id":1, "daemon": "Firebus!  Firebus!"},
    "old":  {"daemon": "Stanislaw Lem"}
  }
           

  經過分析我們可以解析出相應的SQL語句,然後通過JDBC的方式操作MySQL,實作跨庫的資料同步需求。

4、遇到問題

  如果一條資料中存在為NULL值的字段,Maxwell不會将該字段通過Json的方式發送過來,是以Json中不完全包含一條完整的資料,該字段在操作MySQL時隻能将其插入對應字段的預設值,但不影響資料的準确性。

  當一條資料中某個字段由NULL變更為非NULL值時,我們試圖通過解析old串中被更新的字段時是徒勞的,因為此時old串為空,此時會報:java.lang.ArrayIndexOutOfBoundsException: -1(數組越界異常)。

  當一條資料中某個字段由非NULL變更為NULL值時,在data串中你也是無法擷取到該字段的,試圖解析會出現:java.lang.NullPointerException(空指針異常)。

  解決上述兩種問題的方式是對update語句分情況解析。如果被變更狀态的字段都出現在了data串中,那麼可以将data串作為最新的資料更新到MySQL中。否則通過比較data和old串的不同,還原相應字段的最新值,進而更新MySQL,最終問題得到解決。

繼續閱讀