摘要:CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。
本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。
CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。
CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。
本此实践介绍以mysql作为数据源进行数据抓取
MRS集群已安装CDL服务。
MySQL数据库需要开启mysql的bin log功能(默认情况下是开启的)。
查看MySQL是否开启bin log:
使用工具或者命令行连接MySQL数据库(本示例使用navicat工具连接),执行show variables like 'log_%'命令查看。
例如在navicat工具选择"File > New Query"新建查询,输入如下SQL命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。
现在cdl只能使用rest api的方式进行命令提交,所以需要提前安装工具进行调试。本文使用VSCode工具。
完成之后安装rest client插件:
完成之后创建一个cdl.http的文件进行编辑:
CDL任务创建的流程图如下所示:
说明:需要先创建一个MySQL link, 在创建一个Kafka link, 然后再创建一个CDL同步任务并启动。
MySQL link部分rest请求代码
Kafka link部分rest请求代码
CDL任务命令部分rest请求代码
生产库MySQL原始数据如下:
提交CDL任务之后
增加操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);
对应kafka消息体:
更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;
删除操作:delete from hudi.hudisource where uid=11;
点击关注,第一时间了解华为云新鲜技术~