天天看點

談談對Canal(增量資料訂閱與消費)的了解

canal是阿裡巴巴旗下的一款開源項目,純Java開發。基于資料庫增量日志解析,提供增量資料訂閱&消費,目前主要支援了mysql(也支援mariaDB)。

起源:早期,阿裡巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基于trigger的方式擷取增量變更,不過從2010年開始,阿裡系公司開始逐漸的嘗試基于資料庫的日志解析,擷取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。

基于日志增量訂閱&消費支援的業務:

資料庫鏡像

資料庫實時備份

多級索引 (賣家和買家各自分庫索引)

search build

業務cache重新整理

價格變化等重要業務消息

mysql主備複制實作:

談談對Canal(增量資料訂閱與消費)的了解

從上層來看,複制分成三步:

master将改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行檢視);

slave将master的binary log events拷貝到它的中繼日志(relay log);

slave重做中繼日志中的事件,将改變反映它自己的資料。

canal的工作原理

談談對Canal(增量資料訂閱與消費)的了解

原理相對比較簡單:

canal模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定

mysql master收到dump請求,開始推送binary log給slave(也就是canal)

canal解析binary log對象(原始為byte流)

個人了解,資料增量訂閱與消費應當有如下幾個點:

增量訂閱和消費子產品應當包括binlog日志抓取,binlog日志解析,事件分發過濾(EventSink),存儲(EventStore)等主要子產品。

如果需要確定HA可以采用Zookeeper儲存各個子子產品的狀态,讓整個增量訂閱和消費子產品實作無狀态化,當然作為consumer(用戶端)的狀态也可以儲存在zk之中。

整體上通過一個Manager System進行集中管理,配置設定資源。

可以參考下圖:

談談對Canal(增量資料訂閱與消費)的了解
談談對Canal(增量資料訂閱與消費)的了解

說明:

server代表一個canal運作執行個體,對應于一個jvm

instance對應于一個資料隊列 (1個server對應1..n個instance)

instance子產品:

eventParser (資料源接入,模拟slave協定和master進行互動,協定解析)

eventSink (Parser和Store連結器,進行資料過濾,加工,分發的工作)

eventStore (資料存儲)

metaManager (增量訂閱&消費資訊管理器)

談談對Canal(增量資料訂閱與消費)的了解

整個parser過程大緻可分為幾部:

Connection擷取上一次解析成功的位置(如果第一次啟動,則擷取初始制定的位置或者是目前資料庫的binlog位點)

Connection建立連接配接,發生BINLOG_DUMP指令

Mysql開始推送Binary Log

接收到的Binary Log通過Binlog parser進行協定解析,補充一些特定資訊

傳遞給EventSink子產品進行資料存儲,是一個阻塞操作,直到存儲成功

存儲成功後,定時記錄Binary Log位置

談談對Canal(增量資料訂閱與消費)的了解

資料過濾:支援通配符的過濾模式,表名,字段内容等

資料路由/分發:解決1:n (1個parser對應多個store的模式)

資料歸并:解決n:1 (多個parser對應1個store)

資料加工:在進入store之前進行額外的處理,比如join

1 資料1:n業務 :

為了合理的利用資料庫資源, 一般常見的業務都是按照schema進行隔離,然後在mysql上層或者dao這一層面上,進行一個資料源路由,屏蔽資料庫實體位置對開發的影響,阿裡系主要是通過cobar/tddl來解決資料源路由問題。 是以,一般一個資料庫執行個體上,會部署多個schema,每個schema會有由1個或者多個業務方關注。

2 資料n:1業務:

同樣,當一個業務的資料規模達到一定的量級後,必然會涉及到水準拆分和垂直拆分的問題,針對這些拆分的資料需要處理時,就需要連結多個store進行處理,消費的位點就會變成多份,而且資料消費的進度無法得到盡可能有序的保證。 是以,在一定業務場景下,需要将拆分後的增量資料進行歸并處理,比如按照時間戳/全局id進行排序歸并.

目前實作了Memory記憶體、本地file存儲以及持久化到zookeeper以保障資料叢集共享。

Memory記憶體的RingBuffer設計:

談談對Canal(增量資料訂閱與消費)的了解

定義了3個cursor

Put : Sink子產品進行資料存儲的最後一次寫入位置

Get : 資料訂閱擷取的最後一次提取位置

Ack : 資料消費成功的最後一次消費位置

借鑒Disruptor的RingBuffer的實作,将RingBuffer拉直來看:

談談對Canal(增量資料訂閱與消費)的了解

實作說明:

Put/Get/Ack cursor用于遞增,采用long型存儲

buffer的get操作,通過取餘或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數,效率比較高)

談談對Canal(增量資料訂閱與消費)的了解

instance代表了一個實際運作的資料隊列,包括了EventPaser,EventSink,EventStore等元件。

抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

manager方式: 和你自己的内部web console/manager系統進行對接。(alibaba内部使用方式)

spring方式:基于spring xml + properties進行定義,建構spring配置.

spring/memory-instance.xml 所有的元件(parser , sink , store)都選擇了記憶體版模式,記錄位點的都選擇了memory模式,重新開機後又會回到初始位點進行解析。特點:速度最快,依賴最少

spring/file-instance.xml 所有的元件(parser , sink , store)都選擇了基于file持久化模式,注意,不支援HA機制.支援單機持久化

spring/default-instance.xml 所有的元件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證資料叢集共享. 支援HA

spring/group-instance.xml 主要針對需要進行多庫合并時,可以将多個實體instance合并為一個邏輯instance,提供用戶端通路。場景:分庫業務。 比如産品資料拆分了4個庫,每個庫會有一個instance,如果不用group,業務上要消費資料時,需要啟動4個用戶端,分别連結4個instance執行個體。使用group後,可以在canal server上合并為一個邏輯instance,隻需要啟動1個用戶端,連結這個邏輯instance即可.

談談對Canal(增量資料訂閱與消費)的了解

server代表了一個canal的運作執行個體,為了友善元件化使用,特意抽象了Embeded(嵌入式) / Netty(網絡通路)的兩種實作:

Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)

Netty : 基于netty封裝了一層網絡協定,由canal server保證其可用性,采用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。

談談對Canal(增量資料訂閱與消費)的了解

具體的協定格式,可參見:CanalProtocol.proto

get/ack/rollback協定介紹:

Message getWithoutAck(int batchSize),允許指定batchSize,一次可以擷取多條,每次傳回的對象為Message,包含的内容為:

a. batch id 唯一辨別

b. entries 具體的資料對象,對應的資料對象格式:EntryProtocol.proto

void rollback(long batchId),顧命思議,復原上次的get請求,重新擷取資料。基于get擷取的batchId進行送出,避免誤操作

void ack(long batchId),顧命思議,确認已經消費成功,通知server删除資料。基于get擷取的batchId進行送出,避免誤操作

canal的get/ack/rollback協定和正常的jms協定有所不同,允許get/ack異步處理,比如可以連續調用get多次,後續異步按順序送出ack/rollback,項目中稱之為流式api.

流式api設計的好處:

get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀态都是處于正常狀态,異常的rollback屬于個别情況,沒必要為個别的case犧牲整個性能)

get擷取資料後,業務消費存在瓶頸或者需要多程序/多線程消費時,可以不停的輪詢get資料,不停的往後發送任務,提高并行化. (作者在實際業務中的一個case:業務資料消費需要跨中美網絡,是以一次操作基本在200ms以上,為了減少延遲,是以需要實施并行化)

流式api設計:

談談對Canal(增量資料訂閱與消費)的了解

每次get操作都會在meta中産生一個mark,mark标記會遞增,保證運作過程中mark的唯一性

每次的get操作,都會在上一次的mark操作記錄的cursor繼續往後取,如果mark不存在,則在last ack cursor繼續往後取

進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會删除目前的mark标記,并将對應的mark位置更新為last ack cusor

一旦出現異常情況,用戶端可發起rollback情況,重新置位:删除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往後取

canal采用protobuff:

canal-message example:

比如資料庫中的表:

更新一條資料(update person set age=15 where id=4):

canal的HA分為兩部分,canal server和canal client分别有對應的ha實作:

canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間隻能有一個處于running,其他的處于standby狀态.

canal client: 為了保證有序性,一份instance同一時間隻能由一個canal client進行get/ack/rollback操作,否則用戶端接收無法保證有序。

整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定),可以看下我之前zookeeper的相關文章。

Canal Server:

談談對Canal(增量資料訂閱與消費)的了解

大緻步驟:

canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實作:建立EPHEMERAL節點,誰建立成功就允許誰啟動)

建立zookeeper節點成功後,對應的canal server就啟動對應的canal instance,沒有建立成功的canal instance就會處于standby狀态

一旦zookeeper發現canal server A建立的節點消失後,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.

canal client每次進行connect時,會首先向zookeeper詢問目前是誰啟動了canal instance,然後和其建立連結,一旦連結不可用,會重新嘗試connect.

Canal Client的方式和canal server方式類似,也是利用zokeeper的搶占EPHEMERAL節點的方式進行控制.

HA配置架構圖(舉例)如下所示:

談談對Canal(增量資料訂閱與消費)的了解

canal還有幾種連接配接方式:

1. 單連

談談對Canal(增量資料訂閱與消費)的了解

2. 兩個client+兩個instance+1個mysql

當mysql變動時,兩個client都能擷取到變動

談談對Canal(增量資料訂閱與消費)的了解

3. 一個server+兩個instance+兩個mysql+兩個client

談談對Canal(增量資料訂閱與消費)的了解

4. instance的standby配置

談談對Canal(增量資料訂閱與消費)的了解

從整體架構上來說canal是這種架構的(canal中沒有包含一個運維的console web來對接,但要運用于分布式環境中肯定需要一個Manager來管理):

談談對Canal(增量資料訂閱與消費)的了解

一個總體的manager system對應于n個Canal Server(實體上來說是一台伺服器), 那麼一個Canal Server對應于n個Canal Instance(destinations). 大體上是三層結構,第二層也需要Manager統籌運維管理。

那麼随着Docker技術的興起,是否可以試一下下面的架構呢?

談談對Canal(增量資料訂閱與消費)的了解

一個docker中跑一個instance服務,相當于略去server這一層的概念。

Manager System中配置一個instance,直接調取一個docker釋出這個instance,其中包括向這個instance發送配置資訊,啟動instance服務.

instance在運作過程中,定時重新整理binlog filename+ binlog position的資訊至zk。

如果一個instance出現故障,instance本身報錯或者zk感覺此node消失,則根據相應的資訊,比如上一步儲存的binlog filename+binlog position重新開啟一個docker服務,當然這裡可以适當的加一些重試機制。

當要更新時,類似AB test, 先關閉一個docker,然後開啟新的已更新的替換,循序漸進的進行。

當涉及到分表分庫時,多個實體表對應于一個邏輯表,可以将結果存于一個公共的子產品(比如MQ),或者單獨存取也可以,具體情況具體分析

存儲可以參考canal的多樣化:記憶體,檔案,zk,或者加入至MQ中

docker由此之外的工具管理,比如kubernetes

也可以進一步添加HA的功能,兩個docker對應一個mysql,互為主備,類似Canal的HA架構。如果時效性不是貼别強的場景,考慮到成本,此功能可以不采用。

這裡總結了一下Canal的一些點,僅供參考:

原理:模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定;mysql master收到dump請求,開始推送binary log給slave(也就是canal);解析binary log對象(原始為byte流)

重複消費問題:在消費端解決。

采用開源的open-replicator來解析binlog

canal需要維護EventStore,可以存取在Memory, File, zk

canal需要維護用戶端的狀态,同一時刻一個instance隻能有一個消費端消費

資料傳輸格式:protobuff

支援binlog format 類型:statement, row, mixed. 多次附加功能隻能在row下使用,比如otter

binlog position可以支援儲存在記憶體,檔案,zk中

instance啟動方式:rpc/http; 内嵌

有ACK機制

無告警,無監控,這兩個功能都需要對接外部系統

友善快速部署。

<a href="https://github.com/alibaba/canal">https://github.com/alibaba/canal</a>