canal是阿裡巴巴旗下的一款開源項目,純Java開發。基于資料庫增量日志解析,提供增量資料訂閱&消費,目前主要支援了mysql(也支援mariaDB)。
起源:早期,阿裡巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基于trigger的方式擷取增量變更,不過從2010年開始,阿裡系公司開始逐漸的嘗試基于資料庫的日志解析,擷取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
基于日志增量訂閱&消費支援的業務:
資料庫鏡像
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache重新整理
價格變化等重要業務消息
mysql主備複制實作:

從上層來看,複制分成三步:
master将改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行檢視);
slave将master的binary log events拷貝到它的中繼日志(relay log);
slave重做中繼日志中的事件,将改變反映它自己的資料。
canal的工作原理
原理相對比較簡單:
canal模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)
個人了解,資料增量訂閱與消費應當有如下幾個點:
增量訂閱和消費子產品應當包括binlog日志抓取,binlog日志解析,事件分發過濾(EventSink),存儲(EventStore)等主要子產品。
如果需要確定HA可以采用Zookeeper儲存各個子子產品的狀态,讓整個增量訂閱和消費子產品實作無狀态化,當然作為consumer(用戶端)的狀态也可以儲存在zk之中。
整體上通過一個Manager System進行集中管理,配置設定資源。
可以參考下圖:
說明:
server代表一個canal運作執行個體,對應于一個jvm
instance對應于一個資料隊列 (1個server對應1..n個instance)
instance子產品:
eventParser (資料源接入,模拟slave協定和master進行互動,協定解析)
eventSink (Parser和Store連結器,進行資料過濾,加工,分發的工作)
eventStore (資料存儲)
metaManager (增量訂閱&消費資訊管理器)
整個parser過程大緻可分為幾部:
Connection擷取上一次解析成功的位置(如果第一次啟動,則擷取初始制定的位置或者是目前資料庫的binlog位點)
Connection建立連接配接,發生BINLOG_DUMP指令
Mysql開始推送Binary Log
接收到的Binary Log通過Binlog parser進行協定解析,補充一些特定資訊
傳遞給EventSink子產品進行資料存儲,是一個阻塞操作,直到存儲成功
存儲成功後,定時記錄Binary Log位置
資料過濾:支援通配符的過濾模式,表名,字段内容等
資料路由/分發:解決1:n (1個parser對應多個store的模式)
資料歸并:解決n:1 (多個parser對應1個store)
資料加工:在進入store之前進行額外的處理,比如join
1 資料1:n業務 :
為了合理的利用資料庫資源, 一般常見的業務都是按照schema進行隔離,然後在mysql上層或者dao這一層面上,進行一個資料源路由,屏蔽資料庫實體位置對開發的影響,阿裡系主要是通過cobar/tddl來解決資料源路由問題。 是以,一般一個資料庫執行個體上,會部署多個schema,每個schema會有由1個或者多個業務方關注。
2 資料n:1業務:
同樣,當一個業務的資料規模達到一定的量級後,必然會涉及到水準拆分和垂直拆分的問題,針對這些拆分的資料需要處理時,就需要連結多個store進行處理,消費的位點就會變成多份,而且資料消費的進度無法得到盡可能有序的保證。 是以,在一定業務場景下,需要将拆分後的增量資料進行歸并處理,比如按照時間戳/全局id進行排序歸并.
目前實作了Memory記憶體、本地file存儲以及持久化到zookeeper以保障資料叢集共享。
Memory記憶體的RingBuffer設計:
定義了3個cursor
Put : Sink子產品進行資料存儲的最後一次寫入位置
Get : 資料訂閱擷取的最後一次提取位置
Ack : 資料消費成功的最後一次消費位置
借鑒Disruptor的RingBuffer的實作,将RingBuffer拉直來看:
實作說明:
Put/Get/Ack cursor用于遞增,采用long型存儲
buffer的get操作,通過取餘或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數,效率比較高)
instance代表了一個實際運作的資料隊列,包括了EventPaser,EventSink,EventStore等元件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
manager方式: 和你自己的内部web console/manager系統進行對接。(alibaba内部使用方式)
spring方式:基于spring xml + properties進行定義,建構spring配置.
spring/memory-instance.xml 所有的元件(parser , sink , store)都選擇了記憶體版模式,記錄位點的都選擇了memory模式,重新開機後又會回到初始位點進行解析。特點:速度最快,依賴最少
spring/file-instance.xml 所有的元件(parser , sink , store)都選擇了基于file持久化模式,注意,不支援HA機制.支援單機持久化
spring/default-instance.xml 所有的元件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證資料叢集共享. 支援HA
spring/group-instance.xml 主要針對需要進行多庫合并時,可以将多個實體instance合并為一個邏輯instance,提供用戶端通路。場景:分庫業務。 比如産品資料拆分了4個庫,每個庫會有一個instance,如果不用group,業務上要消費資料時,需要啟動4個用戶端,分别連結4個instance執行個體。使用group後,可以在canal server上合并為一個邏輯instance,隻需要啟動1個用戶端,連結這個邏輯instance即可.
server代表了一個canal的運作執行個體,為了友善元件化使用,特意抽象了Embeded(嵌入式) / Netty(網絡通路)的兩種實作:
Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)
Netty : 基于netty封裝了一層網絡協定,由canal server保證其可用性,采用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。
具體的協定格式,可參見:CanalProtocol.proto
get/ack/rollback協定介紹:
Message getWithoutAck(int batchSize),允許指定batchSize,一次可以擷取多條,每次傳回的對象為Message,包含的内容為:
a. batch id 唯一辨別
b. entries 具體的資料對象,對應的資料對象格式:EntryProtocol.proto
void rollback(long batchId),顧命思議,復原上次的get請求,重新擷取資料。基于get擷取的batchId進行送出,避免誤操作
void ack(long batchId),顧命思議,确認已經消費成功,通知server删除資料。基于get擷取的batchId進行送出,避免誤操作
canal的get/ack/rollback協定和正常的jms協定有所不同,允許get/ack異步處理,比如可以連續調用get多次,後續異步按順序送出ack/rollback,項目中稱之為流式api.
流式api設計的好處:
get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀态都是處于正常狀态,異常的rollback屬于個别情況,沒必要為個别的case犧牲整個性能)
get擷取資料後,業務消費存在瓶頸或者需要多程序/多線程消費時,可以不停的輪詢get資料,不停的往後發送任務,提高并行化. (作者在實際業務中的一個case:業務資料消費需要跨中美網絡,是以一次操作基本在200ms以上,為了減少延遲,是以需要實施并行化)
流式api設計:
每次get操作都會在meta中産生一個mark,mark标記會遞增,保證運作過程中mark的唯一性
每次的get操作,都會在上一次的mark操作記錄的cursor繼續往後取,如果mark不存在,則在last ack cursor繼續往後取
進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會删除目前的mark标記,并将對應的mark位置更新為last ack cusor
一旦出現異常情況,用戶端可發起rollback情況,重新置位:删除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往後取
canal采用protobuff:
canal-message example:
比如資料庫中的表:
更新一條資料(update person set age=15 where id=4):
canal的HA分為兩部分,canal server和canal client分别有對應的ha實作:
canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間隻能有一個處于running,其他的處于standby狀态.
canal client: 為了保證有序性,一份instance同一時間隻能由一個canal client進行get/ack/rollback操作,否則用戶端接收無法保證有序。
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定),可以看下我之前zookeeper的相關文章。
Canal Server:
大緻步驟:
canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實作:建立EPHEMERAL節點,誰建立成功就允許誰啟動)
建立zookeeper節點成功後,對應的canal server就啟動對應的canal instance,沒有建立成功的canal instance就會處于standby狀态
一旦zookeeper發現canal server A建立的節點消失後,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.
canal client每次進行connect時,會首先向zookeeper詢問目前是誰啟動了canal instance,然後和其建立連結,一旦連結不可用,會重新嘗試connect.
Canal Client的方式和canal server方式類似,也是利用zokeeper的搶占EPHEMERAL節點的方式進行控制.
HA配置架構圖(舉例)如下所示:
canal還有幾種連接配接方式:
1. 單連
2. 兩個client+兩個instance+1個mysql
當mysql變動時,兩個client都能擷取到變動
3. 一個server+兩個instance+兩個mysql+兩個client
4. instance的standby配置
從整體架構上來說canal是這種架構的(canal中沒有包含一個運維的console web來對接,但要運用于分布式環境中肯定需要一個Manager來管理):
一個總體的manager system對應于n個Canal Server(實體上來說是一台伺服器), 那麼一個Canal Server對應于n個Canal Instance(destinations). 大體上是三層結構,第二層也需要Manager統籌運維管理。
那麼随着Docker技術的興起,是否可以試一下下面的架構呢?
一個docker中跑一個instance服務,相當于略去server這一層的概念。
Manager System中配置一個instance,直接調取一個docker釋出這個instance,其中包括向這個instance發送配置資訊,啟動instance服務.
instance在運作過程中,定時重新整理binlog filename+ binlog position的資訊至zk。
如果一個instance出現故障,instance本身報錯或者zk感覺此node消失,則根據相應的資訊,比如上一步儲存的binlog filename+binlog position重新開啟一個docker服務,當然這裡可以适當的加一些重試機制。
當要更新時,類似AB test, 先關閉一個docker,然後開啟新的已更新的替換,循序漸進的進行。
當涉及到分表分庫時,多個實體表對應于一個邏輯表,可以将結果存于一個公共的子產品(比如MQ),或者單獨存取也可以,具體情況具體分析
存儲可以參考canal的多樣化:記憶體,檔案,zk,或者加入至MQ中
docker由此之外的工具管理,比如kubernetes
也可以進一步添加HA的功能,兩個docker對應一個mysql,互為主備,類似Canal的HA架構。如果時效性不是貼别強的場景,考慮到成本,此功能可以不采用。
這裡總結了一下Canal的一些點,僅供參考:
原理:模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定;mysql master收到dump請求,開始推送binary log給slave(也就是canal);解析binary log對象(原始為byte流)
重複消費問題:在消費端解決。
采用開源的open-replicator來解析binlog
canal需要維護EventStore,可以存取在Memory, File, zk
canal需要維護用戶端的狀态,同一時刻一個instance隻能有一個消費端消費
資料傳輸格式:protobuff
支援binlog format 類型:statement, row, mixed. 多次附加功能隻能在row下使用,比如otter
binlog position可以支援儲存在記憶體,檔案,zk中
instance啟動方式:rpc/http; 内嵌
有ACK機制
無告警,無監控,這兩個功能都需要對接外部系統
友善快速部署。
<a href="https://github.com/alibaba/canal">https://github.com/alibaba/canal</a>