天天看點

使用DTS同步MySQL增量資料到Tablestore

摘要

從MySQL到Tablestore的全量資料導出可以參考同系列文章《

資料同步-從MySQL到Tablestore

》,本文主要介紹将MySQL的增量資料同步到Tablestore的一種方式——使用阿裡集團的資料傳輸服務DTS的資料訂閱功能做增量資料的讀取以及改寫。

注意:DTS資料訂閱服務支援多種

資料庫環境

,老版現不支援MySQL8.0,使用sdk進行消費;新版新增了分組消費概念,需要使用Kafka用戶端消費訂閱資料。本文以RDS(MySQL 5.7)訂閱為例,使用sdk完成增量資料訂閱與改寫。

原理介紹

使用DTS同步MySQL增量資料到Tablestore

導出步驟

1.源、目的資料庫資源

源資料庫:

RDS(

建立執行個體

)/執行個體[pingsheng]/資料庫[pingstest]/表[to_tablestore]

資料表結構如圖

使用DTS同步MySQL增量資料到Tablestore

目的資料庫:

Tablestore(

)/執行個體[pingsheng]/表[from_rds]

使用DTS同步MySQL增量資料到Tablestore

2.雲賬号資源

準備具有源、目的資料庫讀寫權限的一組雲賬号AK

3.DTS資料訂閱

建立訂閱通道

參考

,選擇上述源資料庫執行個體為資料源配置訂閱資訊

使用DTS同步MySQL增量資料到Tablestore

選擇需要訂閱的資料表

使用DTS同步MySQL增量資料到Tablestore

通過資料源預檢查後,資料訂閱配置完成,進入初始化階段大約需要等待十分鐘。初始化完成後,資料訂閱狀态變為“正常”即可以開始消費增量資料。增量資料的消費點從界面可以看到,支援動态調整

參考文檔
使用DTS同步MySQL增量資料到Tablestore

從控制台的“訂閱資料”可以看到已經拉取到的部分展示資料

使用DTS同步MySQL增量資料到Tablestore

從DTS拉取到的增量資料是經過解析和再封裝的,增添了一些解釋參數,訂閱資料的各字段含義

4.訂閱資料的解析與改寫

從DTS讀取MySQL增量資料

下載下傳DTS的SDK,在本地(ECS)進行編譯,

使用DTS同步MySQL增量資料到Tablestore

在資料訂閱“更多”中下載下傳示例代碼,替換掉AK資訊、訂閱ID,編譯啟動程式嘗試擷取增量資料,測試rds資料表中若無增量,會每隔1s收到一條“heartbeat”心跳記錄

使用DTS同步MySQL增量資料到Tablestore

嘗試在源資料表insert、update資料,會列印出以Opt:begin開頭,包含Opt:insert、update,以Opt:commit結尾的多行資料。修改代碼僅保留改寫資料需要的操作類型“Opt”和行資訊的前後鏡像“FieldList”

public void notify(List<ClusterMessage> messages) throws Exception {
  for (ClusterMessage message : messages) {
    // debug
    System.out.println(message.getRecord().getOpt());
    System.out.println(message.getRecord().getFieldList());
    //you must call ackAsConsumed when you consume the data
    message.ackAsConsumed();
  }
}

//BEGIN
//[]
//UPDATE
//[Field name: pk1  //依次輸出各列的前、後鏡像
//Field type: 3
//Field length: 2
//Field value: 83
//,Field name: pk1
//Field type: 3
//Field length: 2
//Field value: 80
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 47
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 50
//]
//COMMIT
//[]           

将增量資料寫入Tablestore

下載下傳Tablestore的SDK ,本地(ECS)進行編譯

調用單行資料操作,将增、删、改的行寫入Tablestore

//PutRow
private static void putRow(SyncClient client, String pkValue, MyColumnValue columnvalue) {
    // 構造主鍵
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();
    RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
    //加入屬性列
    rowPutChange.addColumn(new Column("v1", columnvalue.getv1()));
    rowPutChange.addColumn(new Column("v2", columnvalue.getv2()));
    client.putRow(new PutRowRequest(rowPutChange));
}
//DeleteRow
private static void deleteRow(SyncClient client, String pkValue) {
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();
    RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, primaryKey);
    client.deleteRow(new DeleteRowRequest(rowDeleteChange));
}           

注意:涉及主鍵的Update,需要查分成Delete+Put兩步操作

使用DTS同步MySQL增量資料到Tablestore

->

使用DTS同步MySQL增量資料到Tablestore

源碼參考

下載下傳

繼續閱讀