文章目錄
- 1.Canal介紹
- 1.1 Mysql 的binlog介紹
- 1.2 Canal 的運作原理
- 1.3 Canal使用場景
- 2.Mysql 的配置準備
- 3.Canal 的準備
- 4.Canal 資料結構分析
- 5.Java 代碼
- 6.Kafka 測試
1.Canal介紹
阿裡巴巴 B2B 公司,因為業務的特性,賣家主要集中在國内,買家主要集中在國外,是以衍生出了同步杭州和美國異地機房的需求,從 2010 年開始,阿裡系公司開始逐漸的嘗試基于資料庫的日志解析,擷取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。
Canal 是用 Java 開發的基于資料庫增量日志解析,提供增量資料訂閱&消費的中間件。目前。
Canal 主要支援了 MySQL 的 Binlog 解析,解析完成後才利用 Canal Client 來處理獲得的相關資料
。(資料庫同步需要阿裡的 Otter 中間件,基于 Canal)。
1.1 Mysql 的binlog介紹
MySQL 的二進制日志可以說 MySQL 最重要的日志了,它記錄了所有的 DDL 和 DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL 的二進制日志是事務安全型的。
binlog可分為STATEMENT, MIXED, ROW
- statement:語句級,binlog 會記錄每次一執行寫操作的語句。相對 row 模式節省空間,但是可能産生不一緻性,比如“update tt set create_date=now()”,如果用 binlog 日志進行恢複,由于執行時間不同可能産生的資料就不同。
- row:行級, binlog 會記錄每次操作後每行記錄的變化。
- mixed:statement 的更新版,一定程度上解決了,因為一些情況而造成的 statement模式不一緻問題,預設還是 statement,在某些情況下譬如:當函數中包含 UUID() 時;包含AUTO_INCREMENT 字段的表被更新時;執行 INSERT DELAYED 語句時;用 UDF 時;會按照ROW 的方式進行處理
Canal 想做監控分析,選擇 row 格式比較合适。
1.2 Canal 的運作原理
Mysql的主從複制:
- Master改變資料, 寫入到二進制檔案中
- slave 從master 發送dump協定, 讀取二進制檔案到自己的relay log
- slave讀取relay log到自己的資料庫
canal就是将自己僞裝為slave
1.3 Canal使用場景
1> 進行異地資料庫之間的同步架構
2> 更新緩存, 實作緩存和資料庫的一緻性
3> 抓取業務表的新增變化資料,用于制作實時統計(我們就是這種場景)
2.Mysql 的配置準備
CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);
vim /etc/my.cnf
systemctl restart mysqld
3.Canal 的準備
修改配置
防火牆關閉
4.Canal 資料結構分析
發送的是Message, 由很多Entry組成, 一個Entry對應一個Sql指令
Entry: TableName, EntryTyple, StoreValue, RowChange
RowChange為反序列化後的資料, 如果要使用的話必須通過StoreValue反序列化為RowChange後才可以使用
5.Java 代碼
maven
<dependency>
<groupId>com.alibaba.ottergroupId>
<artifactId>canal.clientartifactId>
<version>1.0.25version>
dependency>
public class Test {
public static void main(String[] args)throws Exception {
// Canal中的資料結構: Message - Entry(對應一個Sql) - TableName, EntryType, StoreValue-RowChange
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.66.66", 11111), "example", "", "");
while (true){
// 連接配接
connector.connect();
// 訂閱
connector.subscribe("cdc_test.*");
// 擷取資料
Message message = connector.get(100);
// 擷取Entry 集合
List<CanalEntry.Entry> entries = message.getEntries();
if (entries.size() <= 0){
System.out.println("稍等一會.........");
Thread.sleep(1000);
}else {
for (CanalEntry.Entry entry : entries) {
// 1.擷取表名
String tableName = entry.getHeader().getTableName();
// 2.擷取類型
CanalEntry.EntryType entryType = entry.getEntryType();
// 3.擷取序列化後的資料
ByteString storeValue = entry.getStoreValue();
// 判斷目前的類型是否為Row
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
// 5.反序列化資料
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 6.擷取EventType
CanalEntry.EventType eventType = rowChange.getEventType();
// 7.擷取資料集
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
JSONObject beforeJson = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
beforeJson.put(column.getName(), column.getValue());
}
JSONObject afterJson = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterJson.put(column.getName(), column.getValue());
}
System.out.println("Table:" + tableName + "eventType: " + eventType + "before: " + beforeJson + "after: " + afterJson);
}
}else {
System.out.println("目前資料類型為:" + entryType);
}
}
}
}
}
}
6.Kafka 測試
修改 canal.properties 中 canal 的輸出 model,預設 tcp,改為輸出到 kafka
修改 Kafka 叢集的位址
canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
修改 instance.properties 輸出到 Kafka 的主題以及分區數
# mq config
canal.mq.topic=canal_test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test