摘要:CDL是一種簡單、高效的資料實時內建服務,能夠從各種OLTP資料庫中抓取Data Change事件,然後推送至Kafka中,最後由Sink Connector消費Topic中的資料并導入到大資料生态軟體應用中,進而實作資料的實時入湖。
本文分享自華為雲社群《華為FusionInsight MRS CDL使用指南》,作者:晉紅輕。
CDL是一種簡單、高效的資料實時內建服務,能夠從各種OLTP資料庫中抓取Data Change事件,然後推送至Kafka中,最後由Sink Connector消費Topic中的資料并導入到大資料生态軟體應用中,進而實作資料的實時入湖。
CDL服務包含了兩個重要的角色:CDLConnector和CDLService。CDLConnector是具體執行資料抓取任務的執行個體,CDLService是負責管理和建立任務的執行個體。
本此實踐介紹以mysql作為資料源進行資料抓取
MRS叢集已安裝CDL服務。
MySQL資料庫需要開啟mysql的bin log功能(預設情況下是開啟的)。
檢視MySQL是否開啟bin log:
使用工具或者指令行連接配接MySQL資料庫(本示例使用navicat工具連接配接),執行show variables like 'log_%'指令檢視。
例如在navicat工具選擇"File > New Query"建立查詢,輸入如下SQL指令,單擊"Run"在結果中"log_bin"顯示為"ON"則表示開啟成功。
現在cdl隻能使用rest api的方式進行指令送出,是以需要提前安裝工具進行調試。本文使用VSCode工具。
完成之後安裝rest client插件:
完成之後建立一個cdl.http的檔案進行編輯:
CDL任務建立的流程圖如下所示:
說明:需要先建立一個MySQL link, 在建立一個Kafka link, 然後再建立一個CDL同步任務并啟動。
MySQL link部分rest請求代碼
Kafka link部分rest請求代碼
CDL任務指令部分rest請求代碼
生産庫MySQL原始資料如下:
送出CDL任務之後
增加操作: insert into hudi.hudisource values (11,“蔣語堂”,38,“女”,“圖”,“播放器”,28732);
對應kafka消息體:
更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;
删除操作:delete from hudi.hudisource where uid=11;
點選關注,第一時間了解華為雲新鮮技術~