早期,阿裡巴巴b2b公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基于trigger的方式擷取增量變更,不過從2010年開始,阿裡系公司開始逐漸的嘗試基于資料庫的日志解析,擷取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前内部使用的同步,已經支援mysql5.x和oracle部分版本的日志解析
基于日志增量訂閱&消費支援的業務:
資料庫鏡像
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache重新整理
價格變化等重要業務消息
名稱:canal [kə'næl]
譯意: 水道/管道/溝渠
語言: 純java開發
定位: 基于資料庫增量日志解析,提供增量資料訂閱&消費,目前主要支援了mysql

從上層來看,複制分成三步:
master将改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行檢視);
slave将master的binary log events拷貝到它的中繼日志(relay log);
slave重做中繼日志中的事件,将改變反映它自己的資料。
原理相對比較簡單:
canal模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)
說明:
server代表一個canal運作執行個體,對應于一個jvm
instance對應于一個資料隊列 (1個server對應1..n個instance)
instance子產品:
eventparser (資料源接入,模拟slave協定和master進行互動,協定解析)
eventsink (parser和store連結器,進行資料過濾,加工,分發的工作)
eventstore (資料存儲)
metamanager (增量訂閱&消費資訊管理器)
mysql的binlay log介紹
http://dev.mysql.com/doc/refman/5.5/en/binary-log.html
http://www.taobaodba.com/html/474_mysqls-binary-log_details.html
簡單點說:
mysql的binlog是多檔案存儲,定位一個logevent需要通過binlog filename + binlog position,進行定位
mysql的binlog資料格式,按照生成的方式,主要分為:statement-based、row-based、mixed。
java代碼
mysql> show variables like 'binlog_format';
+---------------+-------+
| variable_name | value |
| binlog_format | row |
1 row in set (0.00 sec)
目前canal隻能支援row模式的增量訂閱(statement隻有sql,沒有資料,是以無法擷取原始的變更日志)
大緻過程:
整個parser過程大緻可分為幾步:
connection擷取上一次解析成功的位置 (如果第一次啟動,則擷取初始指定的位置或者是目前資料庫的binlog位點)
connection建立連結,發送binlog_dump指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
mysql開始推送binaly log
接收到的binaly log的通過binlog parser進行協定解析,補充一些特定資訊
// 補充字段名字,字段類型,主鍵資訊,unsigned類型處理
傳遞給eventsink子產品進行資料存儲,是一個阻塞操作,直到存儲成功
存儲成功後,定時記錄binaly log位置
mysql的binlay log網絡協定:
圖中的協定4byte header,主要是描述整個binlog網絡包的length
binlog event structure,詳細資訊請參考: http://dev.mysql.com/doc/internals/en/binary-log.html
資料過濾:支援通配符的過濾模式,表名,字段内容等
資料路由/分發:解決1:n (1個parser對應多個store的模式)
資料歸并:解決n:1 (多個parser對應1個store)
資料加工:在進入store之前進行額外的處理,比如join
為了合理的利用資料庫資源, 一般常見的業務都是按照schema進行隔離,然後在mysql上層或者dao這一層面上,進行一個資料源路由,屏蔽資料庫實體位置對開發的影響,阿裡系主要是通過cobar/tddl來解決資料源路由問題。
是以,一般一個資料庫執行個體上,會部署多個schema,每個schema會有由1個或者多個業務方關注
同樣,當一個業務的資料規模達到一定的量級後,必然會涉及到水準拆分和垂直拆分的問題,針對這些拆分的資料需要處理時,就需要連結多個store進行處理,消費的位點就會變成多份,而且資料消費的進度無法得到盡可能有序的保證。
是以,在一定業務場景下,需要将拆分後的增量資料進行歸并處理,比如按照時間戳/全局id進行排序歸并.
1. 目前僅實作了memory記憶體模式,後續計劃增加本地file存儲,mixed混合模式
2. 借鑒了disruptor的ringbuffer的實作思路
ringbuffer設計:
定義了3個cursor
put : sink子產品進行資料存儲的最後一次寫入位置
get : 資料訂閱擷取的最後一次提取位置
ack : 資料消費成功的最後一次消費位置
借鑒disruptor的ringbuffer的實作,将ringbuffer拉直來看:
實作說明:
put/get/ack cursor用于遞增,采用long型存儲
buffer的get操作,通過取餘或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數,效率比較高)
instance代表了一個實際運作的資料隊列,包括了eventpaser,eventsink,eventstore等元件。
抽象了canalinstancegenerator,主要是考慮配置的管理方式:
manager方式: 和你自己的内部web console/manager系統進行對接。(目前主要是公司内部使用)
spring方式:基于spring xml + properties進行定義,建構spring配置.
server代表了一個canal的運作執行個體,為了友善元件化使用,特意抽象了embeded(嵌入式) / netty(網絡通路)的兩種實作
embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)
netty : 基于netty封裝了一層網絡協定,由canal server保證其可用性,采用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。(阿裡系的notify和metaq,典型的push/pull模型,目前也逐漸的在向pull模型靠攏,push在資料量大的時候會有一些問題)
具體的協定格式,可參見:canalprotocol.proto
get/ack/rollback協定介紹:
message getwithoutack(int batchsize),允許指定batchsize,一次可以擷取多條,每次傳回的對象為message,包含的内容為:
a. batch id 唯一辨別
b. entries 具體的資料對象,對應的資料對象格式:entryprotocol.proto
void rollback(long batchid),顧命思議,復原上次的get請求,重新擷取資料。基于get擷取的batchid進行送出,避免誤操作
void ack(long batchid),顧命思議,确認已經消費成功,通知server删除資料。基于get擷取的batchid進行送出,避免誤操作
canal的get/ack/rollback協定和正常的jms協定有所不同,允許get/ack異步處理,比如可以連續調用get多次,後續異步按順序送出ack/rollback,項目中稱之為流式api.
流式api設計的好處:
get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀态都是處于正常狀态,異常的rollback屬于個别情況,沒必要為個别的case犧牲整個性能)
get擷取資料後,業務消費存在瓶頸或者需要多程序/多線程消費時,可以不停的輪詢get資料,不停的往後發送任務,提高并行化. (作者在實際業務中的一個case:業務資料消費需要跨中美網絡,是以一次操作基本在200ms以上,為了減少延遲,是以需要實施并行化)
流式api設計:
每次get操作都會在meta中産生一個mark,mark标記會遞增,保證運作過程中mark的唯一性
每次的get操作,都會在上一次的mark操作記錄的cursor繼續往後取,如果mark不存在,則在last ack cursor繼續往後取
進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會删除目前的mark标記,并将對應的mark位置更新為last ack cusor
一旦出現異常情況,用戶端可發起rollback情況,重新置位:删除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往後取
可以提供資料庫變更前和變更後的字段内容,針對binlog中沒有的name,iskey等資訊進行補全
可以提供ddl的變更語句
canal的ha分為兩部分,canal server和canal client分别有對應的ha實作
canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間隻能有一個處于running,其他的處于standby狀态.
canal client: 為了保證有序性,一份instance同一時間隻能由一個canal client進行get/ack/rollback操作,否則用戶端接收無法保證有序。
整個ha機制的控制主要是依賴了zookeeper的幾個特性,watcher和ephemeral節點(和session生命周期綁定),可以看下我之前zookeeper的相關文章。
canal server:
大緻步驟:
canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實作:建立ephemeral節點,誰建立成功就允許誰啟動)
建立zookeeper節點成功後,對應的canal server就啟動對應的canal instance,沒有建立成功的canal instance就會處于standby狀态
一旦zookeeper發現canal server a建立的節點消失後,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.
canal client每次進行connect時,會首先向zookeeper詢問目前是誰啟動了canal instance,然後和其建立連結,一旦連結不可用,會重新嘗試connect.
canal client的方式和canal server方式類似,也是利用zokeeper的搶占ephemeral節點的方式進行控制.
項目的代碼: https://github.com/alibabatech/canal
這裡給出了如何快速啟動canal server和canal client的例子,如有問題可随時聯系