天天看點

開源基于Canal的開源增量資料訂閱&消費中間件

CanalSync

canal 是阿裡巴巴開源的一款基于資料庫增量日志解析,提供增量資料訂閱&消費,目前主要支援了MySQL(也支援mariaDB)。

我開發的這個CanalSync項目 https://github.com/yuzd/CanalSync   ==>覺得不錯幫忙給個star謝謝

是基于canal-server之上的資料庫同步&消費中間件,

用于可快速搭建消費canal-server的項目。 目前我已實作并開源了如下:

  1. 資料消費傳輸到redis元件
  2. 資料消費傳輸到rabbitmq元件
  3. 資料消費傳輸到mysql資料庫元件

示意圖

開源基于Canal的開源增量資料訂閱&消費中間件

Nuget:

1. 接收canal-server的消息中間件:

Install-Package Canal.Server

2. 解析canal-server消息轉出可執行sql的中間件:

Install-Package Canal.SqlParse

如何使用

如果你需要寫一個資料消費傳輸到XXXMQ,用不到反解析成sql的話,隻需要引用 Canal.Server中間件。 如果你需要寫一個資料消費傳輸到XXXdb,得用到反解析sql中間件,需要同時引用Canal.Server 和 Canal.SqlParse 這2個中間件。

Canal.Server 如何使用

  1. 引用 Canal.Server 并appsettings.json 配置canal-server的參數.如下圖:
開源基于Canal的開源增量資料訂閱&消費中間件

參數說明: 

開源基于Canal的開源增量資料訂閱&消費中間件
  1. 建立一個 消費類 必須要 實作: INotificationHandler 接口,例如叫TestHandler
public class TestHandler:INotificationHandler<CanalBody>{
        public Task Handle(CanalBody notification)
        {
            //寫消費邏輯

            return Task.CompletedTask;;
        }
    }
    
           
  1. 在startUp 使用并注冊 該消費類
//注冊了之後 canal-server有新的消息就會進入到TestHandler的Handle方法
      services.AddCanalService(produce => produce.RegisterSingleton<TestHandler>());
           

Canal.SqlParse 如何使用

目前隻實作了解析mysql的邏輯,未來會加入sqlserver的解析邏輯!!

//注冊使用 connectionString是mysql的資料庫連接配接字元串
      services.AddMysqlParseService(connectionString);
      
      // 計劃中還未實作
      //services.AddSqlserverParseService(connectionString);
           

在類的構造方法可注入:

開源基于Canal的開源增量資料訂閱&amp;消費中間件

如上圖,代表将canal-server的資料直接在另外的mysql庫裡面執行,等于2個mysql資料進行互相同步。

歐洲與中國的2個mysql庫 使用上述方法進行同步的測試

結果: 同步速度在100~200qps 有點低,

下一步的優化方案是參考otter。消費端改用用aria2來下載下傳

開源基于Canal的開源增量資料訂閱&amp;消費中間件

如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”将是我最大的寫作動力!歡迎各位轉載,轉載文章之後須在文章頁面明顯位置給出作者和原文連接配接,謝謝。

繼續閱讀