天天看點

Canal CDC

文章目錄

  • ​​1.Canal介紹​​
  • ​​1.1 Mysql 的binlog介紹​​
  • ​​1.2 Canal 的運作原理​​
  • ​​1.3 Canal使用場景​​
  • ​​2.Mysql 的配置準備​​
  • ​​3.Canal 的準備​​
  • ​​4.Canal 資料結構分析​​
  • ​​5.Java 代碼​​
  • ​​6.Kafka 測試​​

1.Canal介紹

阿裡巴巴 B2B 公司,因為業務的特性,賣家主要集中在國内,買家主要集中在國外,是以衍生出了同步杭州和美國異地機房的需求,從 2010 年開始,阿裡系公司開始逐漸的嘗試基于資料庫的日志解析,擷取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。

Canal 是用 Java 開發的基于資料庫增量日志解析,提供增量資料訂閱&消費的中間件。目前。​​

​Canal 主要支援了 MySQL 的 Binlog 解析,解析完成後才利用 Canal Client 來處理獲得的相關資料​

​。(資料庫同步需要阿裡的 Otter 中間件,基于 Canal)。

1.1 Mysql 的binlog介紹

MySQL 的二進制日志可以說 MySQL 最重要的日志了,它記錄了所有的 DDL 和 DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL 的二進制日志是事務安全型的。

binlog可分為STATEMENT, MIXED, ROW

  • statement:語句級,binlog 會記錄每次一執行寫操作的語句。相對 row 模式節省空間,但是可能産生不一緻性,比如“update tt set create_date=now()”,如果用 binlog 日志進行恢複,由于執行時間不同可能産生的資料就不同。
  • row:行級, binlog 會記錄每次操作後每行記錄的變化。
  • mixed:statement 的更新版,一定程度上解決了,因為一些情況而造成的 statement模式不一緻問題,預設還是 statement,在某些情況下譬如:當函數中包含 UUID() 時;包含AUTO_INCREMENT 字段的表被更新時;執行 INSERT DELAYED 語句時;用 UDF 時;會按照ROW 的方式進行處理

Canal 想做監控分析,選擇 row 格式比較合适。

1.2 Canal 的運作原理

Canal CDC

Mysql的主從複制:

  1. Master改變資料, 寫入到二進制檔案中
  2. slave 從master 發送dump協定, 讀取二進制檔案到自己的relay log
  3. slave讀取relay log到自己的資料庫

canal就是将自己僞裝為slave

1.3 Canal使用場景

1> 進行異地資料庫之間的同步架構

Canal CDC

2> 更新緩存, 實作緩存和資料庫的一緻性

Canal CDC

3> 抓取業務表的新增變化資料,用于制作實時統計(我們就是這種場景)

2.Mysql 的配置準備

CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);      
vim /etc/my.cnf      
Canal CDC
systemctl restart mysqld      

3.Canal 的準備

Canal CDC

修改配置

Canal CDC
Canal CDC
Canal CDC
Canal CDC

防火牆關閉

Canal CDC

4.Canal 資料結構分析

Canal CDC

發送的是Message, 由很多Entry組成, 一個Entry對應一個Sql指令

Entry: TableName, EntryTyple, StoreValue, RowChange

RowChange為反序列化後的資料, 如果要使用的話必須通過StoreValue反序列化為RowChange後才可以使用

5.Java 代碼

maven

<dependency>
            <groupId>com.alibaba.ottergroupId>
            <artifactId>canal.clientartifactId>
            <version>1.0.25version>
        dependency>      
public class Test {
    public static void main(String[] args)throws Exception {
        // Canal中的資料結構: Message - Entry(對應一個Sql) - TableName, EntryType, StoreValue-RowChange
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.66.66", 11111), "example", "", "");

        while (true){
            // 連接配接
            connector.connect();
            // 訂閱
            connector.subscribe("cdc_test.*");
            // 擷取資料
            Message message = connector.get(100);

            // 擷取Entry 集合
            List<CanalEntry.Entry> entries = message.getEntries();

            if (entries.size() <= 0){
                System.out.println("稍等一會.........");
                Thread.sleep(1000);
            }else {
                for (CanalEntry.Entry entry : entries) {
                    // 1.擷取表名
                    String tableName = entry.getHeader().getTableName();
                    // 2.擷取類型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    // 3.擷取序列化後的資料
                    ByteString storeValue = entry.getStoreValue();
                    // 判斷目前的類型是否為Row
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        // 5.反序列化資料
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // 6.擷取EventType
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 7.擷取資料集
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            JSONObject beforeJson = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeJson.put(column.getName(), column.getValue());
                            }

                            JSONObject afterJson = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterJson.put(column.getName(), column.getValue());
                            }
                            System.out.println("Table:" + tableName + "eventType: " + eventType + "before: " + beforeJson + "after: " + afterJson);


                        }
                    }else {
                        System.out.println("目前資料類型為:" + entryType);
                    }

                }
            }
        }
    }
}      
Canal CDC

6.Kafka 測試

修改 canal.properties 中 canal 的輸出 model,預設 tcp,改為輸出到 kafka

Canal CDC

修改 Kafka 叢集的位址

canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092      

修改 instance.properties 輸出到 Kafka 的主題以及分區數

# mq config
canal.mq.topic=canal_test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id      
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test      
上一篇: CDC繪圖總結