天天看點

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

作者:财高八鬥Java

資料同步一直是一個令人頭疼的問題。在業務量小,場景不多,資料量不大的情況下我們可能會選擇在項目中直接寫一些定時任務手動處理資料,例如從多個表将資料查出來,再彙總處理,再插入到相應的地方。

但是随着業務量增大,資料量變多以及各種複雜場景下的分庫分表的實作,使資料同步變得越來越困難。

今天這篇文章使用阿裡開源的中間件Canal解決資料增量同步的痛點。

文章目錄如下:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

Canal是什麼?

canal譯意為水道/管道/溝渠,主要用途是基于 MySQL 資料庫增量日志解析,提供增量資料訂閱和消費。

從這句話了解到了什麼?

基于MySQL,并且通過MySQL日志進行的增量解析,這也就意味着對原有的業務代碼完全是無侵入性的。

工作原理:解析MySQL的binlog日志,提供增量資料。

基于日志增量訂閱和消費的業務包括

  • 資料庫鏡像
  • 資料庫實時備份
  • 索引建構和實時維護(拆分異構索引、反向索引等)
  • 業務 cache 重新整理
  • 帶業務邏輯的增量資料處理

目前的 canal 支援源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官方文檔:github.com/alibaba/can…

Canal資料如何傳輸?

先來一張官方圖:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

Canal分為服務端和用戶端,這也是阿裡常用的套路,比如前面講到的注冊中心Nacos:

  • 服務端:負責解析MySQL的binlog日志,傳遞增量資料給用戶端或者消息中間件
  • 用戶端:負責解析服務端傳過來的資料,然後定制自己的業務處理。

目前為止支援的消息中間件很全面了,比如Kafka、RocketMQ,RabbitMQ。

資料同步還有其他中間件嗎?

有,當然有,還有一些開源的中間件也是相當不錯的,比如Bifrost。

常見的幾款中間件的差別如下:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

當然要我選擇的話,首選阿裡的中間件Canal。

Canal服務端安裝

服務端需要下載下傳壓縮包,下載下傳位址:github.com/alibaba/can…

目前最新的是v1.1.5,點選下載下傳:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

下載下傳完成解壓,目錄如下:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

本文使用Canal+RabbitMQ進行資料的同步,是以下面步驟完全按照這個base進行。

1、打開MySQL的binlog日志

修改MySQL的日志檔案,my.cnf 配置如下:

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複           

2、設定MySQL的配置

需要設定服務端配置檔案中的MySQL配置,這樣Canal才能知道需要監聽哪個庫、哪個表的日志檔案。

一個 Server 可以配置多個執行個體監聽 ,Canal 功能預設自帶的有個 example 執行個體,本篇就用 example 執行個體 。如果增加執行個體,複制 example 檔案夾内容到同級目錄下,然後在 canal.properties 指定添加執行個體的名稱。

修改canal.deployer-1.1.5\conf\example\instance.properties配置檔案

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 監聽的資料庫
canal.instance.defaultDatabaseName=test

# 監聽的表,可以指定,多個用逗号分割,這裡正則是監聽所有
canal.instance.filter.regex=.*\\..*           

3、設定RabbitMQ的配置

服務端預設的傳輸方式是tcp,需要在配置檔案中設定MQ的相關資訊。

這裡需要修改兩處配置檔案,如下;

1、canal.deployer-1.1.5\conf\canal.properties

這個配置檔案主要是設定MQ相關的配置,比如URL,使用者名、密碼...

# 傳輸方式:tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/
# exchange
rabbitmq.exchange =canal.exchange
# 使用者名、密碼
rabbitmq.username =guest
rabbitmq.password =guest
## 是否持久化
rabbitmq.deliveryMode = 2           

2、canal.deployer-1.1.5\conf\example\instance.properties

這個檔案設定MQ的路由KEY,這樣才能路由到指定的隊列中,如下:

canal.mq.topic=canal.routing.key           

4、RabbitMQ建立exchange和Queue

在RabbitMQ中需要建立一個canal.exchange(必須和配置中的相同)的exchange和一個名稱為 canal.queue(名稱随意)的隊列。

其中綁定的路由KEY為:canal.routing.key(必須和配置中的相同),如下圖:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

5、啟動服務端

點選bin目錄下的腳本,windows直接輕按兩下startup.bat,啟動成功如下:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

6、測試

在本地資料庫test中的oauth_client_details插入一條資料,如下:

INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');           

此時檢視MQ中的canal.queue已經有了資料,如下:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

其實就是一串JSON資料,這個JSON如下:

{
	"data": [{
		"client_id": "myjszl",
		"resource_ids": "res1",
		"client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
		"scope": "all",
		"authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
		"web_server_redirect_uri": "http://www.baidu.com",
		"authorities": null,
		"access_token_validity": "1000",
		"refresh_token_validity": "1000",
		"additional_information": null,
		"autoapprove": "false"
	}],
	"database": "test",
	"es": 1640337532000,
	"id": 7,
	"isDdl": false,
	"mysqlType": {
		"client_id": "varchar(48)",
		"resource_ids": "varchar(256)",
		"client_secret": "varchar(256)",
		"scope": "varchar(256)",
		"authorized_grant_types": "varchar(256)",
		"web_server_redirect_uri": "varchar(256)",
		"authorities": "varchar(256)",
		"access_token_validity": "int(11)",
		"refresh_token_validity": "int(11)",
		"additional_information": "varchar(4096)",
		"autoapprove": "varchar(256)"
	},
	"old": null,
	"pkNames": ["client_id"],
	"sql": "",
	"sqlType": {
		"client_id": 12,
		"resource_ids": 12,
		"client_secret": 12,
		"scope": 12,
		"authorized_grant_types": 12,
		"web_server_redirect_uri": 12,
		"authorities": 12,
		"access_token_validity": 4,
		"refresh_token_validity": 4,
		"additional_information": 12,
		"autoapprove": 12
	},
	"table": "oauth_client_details",
	"ts": 1640337532520,
	"type": "INSERT"
}           

每個字段的意思已經很清楚了,有表名稱、方法、參數、參數類型、參數值.....

用戶端要做的就是監聽MQ擷取JSON資料,然後将其解析出來,處理自己的業務邏輯。

Canal用戶端搭建

用戶端很簡單實作,要做的就是消費Canal服務端傳遞過來的消息,監聽canal.queue這個隊列。

1、建立消息實體類

MQ傳遞過來的是JSON資料,當然要建立個實體類接收資料,如下:

/**
 * @author 公号 碼猿技術專欄
 * Canal消息接收實體類
 */
@NoArgsConstructor
@Data
public class CanalMessage<T> {
    @JsonProperty("type")
    private String type;

    @JsonProperty("table")
    private String table;

    @JsonProperty("data")
    private List<T> data;

    @JsonProperty("database")
    private String database;

    @JsonProperty("es")
    private Long es;

    @JsonProperty("id")
    private Integer id;

    @JsonProperty("isDdl")
    private Boolean isDdl;

    @JsonProperty("old")
    private List<T> old;

    @JsonProperty("pkNames")
    private List<String> pkNames;

    @JsonProperty("sql")
    private String sql;

    @JsonProperty("ts")
    private Long ts;
}           

2、MQ消息監聽業務

接下來就是監聽隊列,一旦有Canal服務端有資料推送能夠及時的消費。

代碼很簡單,隻是給出個接收的案例,具體的業務邏輯可以根據業務實作,如下:

import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 監聽MQ擷取Canal增量的資料消息
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
    public void handleDataChange(String message) {
        //将message轉換為CanalMessage
        CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
        String tableName = canalMessage.getTable();
        log.info("Canal 監聽 {} 發生變化;明細:{}", tableName, message);
        //TODO 業務邏輯自己完善...............
    }
}           

3、測試

下面向表中插入資料,看下接收的消息是什麼樣的,SQL如下:

INSERT INTO `oauth_client_details`
VALUES
	( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );           

用戶端轉換後的消息如下圖:

實戰!Spring Boot 整合 阿裡開源中間件 Canal 實作資料增量同步

上圖可以看出所有的資料都已經成功接收到,隻需要根據資料完善自己的業務邏輯即可。

總結

資料增量同步的開源工具并不隻有Canal一種,根據自己的業務需要選擇合适的元件。

繼續閱讀