摘要
從MySQL到Tablestore的全量資料導出可以參考同系列文章《
資料同步-從MySQL到Tablestore》,本文主要介紹将MySQL的增量資料同步到Tablestore的一種方式——使用阿裡集團的資料傳輸服務DTS的資料訂閱功能做增量資料的讀取以及改寫。
注意:DTS資料訂閱服務支援多種
資料庫環境,老版現不支援MySQL8.0,使用sdk進行消費;新版新增了分組消費概念,需要使用Kafka用戶端消費訂閱資料。本文以RDS(MySQL 5.7)訂閱為例,使用sdk完成增量資料訂閱與改寫。
原理介紹

導出步驟
1.源、目的資料庫資源
源資料庫:
RDS(
建立執行個體)/執行個體[pingsheng]/資料庫[pingstest]/表[to_tablestore]
資料表結構如圖
目的資料庫:
Tablestore(
)/執行個體[pingsheng]/表[from_rds]
2.雲賬号資源
準備具有源、目的資料庫讀寫權限的一組雲賬号AK
3.DTS資料訂閱
建立訂閱通道
參考,選擇上述源資料庫執行個體為資料源配置訂閱資訊
選擇需要訂閱的資料表
通過資料源預檢查後,資料訂閱配置完成,進入初始化階段大約需要等待十分鐘。初始化完成後,資料訂閱狀态變為“正常”即可以開始消費增量資料。增量資料的消費點從界面可以看到,支援動态調整
參考文檔從控制台的“訂閱資料”可以看到已經拉取到的部分展示資料
從DTS拉取到的增量資料是經過解析和再封裝的,增添了一些解釋參數,訂閱資料的各字段含義
4.訂閱資料的解析與改寫
從DTS讀取MySQL增量資料
下載下傳DTS的SDK,在本地(ECS)進行編譯,
在資料訂閱“更多”中下載下傳示例代碼,替換掉AK資訊、訂閱ID,編譯啟動程式嘗試擷取增量資料,測試rds資料表中若無增量,會每隔1s收到一條“heartbeat”心跳記錄
嘗試在源資料表insert、update資料,會列印出以Opt:begin開頭,包含Opt:insert、update,以Opt:commit結尾的多行資料。修改代碼僅保留改寫資料需要的操作類型“Opt”和行資訊的前後鏡像“FieldList”
public void notify(List<ClusterMessage> messages) throws Exception {
for (ClusterMessage message : messages) {
// debug
System.out.println(message.getRecord().getOpt());
System.out.println(message.getRecord().getFieldList());
//you must call ackAsConsumed when you consume the data
message.ackAsConsumed();
}
}
//BEGIN
//[]
//UPDATE
//[Field name: pk1 //依次輸出各列的前、後鏡像
//Field type: 3
//Field length: 2
//Field value: 83
//,Field name: pk1
//Field type: 3
//Field length: 2
//Field value: 80
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 47
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 50
//]
//COMMIT
//[]
将增量資料寫入Tablestore
下載下傳Tablestore的SDK ,本地(ECS)進行編譯
調用單行資料操作,将增、删、改的行寫入Tablestore
//PutRow
private static void putRow(SyncClient client, String pkValue, MyColumnValue columnvalue) {
// 構造主鍵
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
//加入屬性列
rowPutChange.addColumn(new Column("v1", columnvalue.getv1()));
rowPutChange.addColumn(new Column("v2", columnvalue.getv2()));
client.putRow(new PutRowRequest(rowPutChange));
}
//DeleteRow
private static void deleteRow(SyncClient client, String pkValue) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, primaryKey);
client.deleteRow(new DeleteRowRequest(rowDeleteChange));
}
注意:涉及主鍵的Update,需要查分成Delete+Put兩步操作
->