天天看点

Debezium+Kafka实时数据同步验证

作者:数通畅联

在进行信息系统集成的过程中,数据集成是一个非常重要的集成内容,根据实际业务场景的不同,数据的同步集成又可以分为,如果数据的实时性要求比较高则需要采用实时抽取。

目前基于数据库的实时数据抽取存在多种不同的方式,本次主要验证Debezium+Kafka的实时抽取方式,,实现基于数据库的实时数据同步。

总体说明

本次主要验证Debezium+Kafka的方式实现实时数据同步,通过Debezium监控MySQL数据库,将MySQL中的数据变更实时抽取到Kafka中,再基于Kafka将同步进来的数据写入下游数据库,实时数据库的实时同步。

1.组件介绍

:是一个分布式平台,可将现有的数据库转换为事件流,因此应用程序可以感知到数据库中的每个行级更改并对此做出立即响应,,并提供与Kafka Connect兼容的Connector以便监控指定的数据库管理系统。

从应用程序开始使用数据库的时候,Debezium就会在Kafka Log中记录它们数据变更历史记录,这会使应用程序可以轻松消费所有正确且完整的事件。即使应用程序意外停止,它也不会丢失任何事件,当应用程序重新启动时,会继续从上次它停止的位置重新消费。

:是一种,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2.技术架构

Debezium的实现方式主要有三种方式::

(1)Kafka Connect基于Kafka的Connector连接器实现,部署简单、体量小,但是需要部署Kafka;

(2)Debezium Server:独立部署Debezium应用进行数据采集同步,需要更多的资源独立部署应用程序;

(3)嵌入式引擎:不依赖于Kafka,但是需要嵌入到其他应用系统中。

本次主要验证第一种方式,。

> > > > Kafka Connect

Kafka Connect为Kafka和外部存储系统之间系统数据提供了一种可靠且可伸缩性的方式。它为 Connector插件:Connect负责运行这些插件,它们则负责移动数据。通过Kafka Connect可以快速实现Source Connector和Sink Connector进行交互构造一个低延迟的数据Pipeline:

1.Source Connector(Debezium):将记录发送到 Kafka;

2.Sink Connector:将Kafka Topic中的记录发送到其他系统;

下图展示了基于Debezium的变更数据捕获Pipeline架构:

Debezium+Kafka实时数据同步验证

如上图所示,部署了MySQL和PostgresSQL的Debezium Connector以捕获这两种类型数据库的变更。:

(1)MySQL Connector使用客户端库来访问binlog;

(2)PostgreSQL Connector从逻辑副本流中读取数据;

除了Kafka Broker之外,Kafka Connect也作为一个单独的服务运行。默认情况下,数据库表的变更会写入与表名称对应的Topic中。如果需要可以通过配置Debezium的Topic路由转换来调整目标Topic名称。

变更事件记录在Kafka中后,Kafka Connect生态系统中的不同Sink Connector可以将记录流式传输到其他系统、数据库,例如等。

> > > > Debezium Server

另一种部署Debezium的方法是使用Debezium Server。Debezium Server是一个可配置的、随时可用的应用程序,可以将变更事件从源数据库流式传输到各种消息中间件上。

下图展示了基于Debezium Server的变更数据捕获 Pipeline 架构:

Debezium+Kafka实时数据同步验证

,变更事件可以序列化为不同的格式,例如JSON或Apache Avro,然后发送到各种消息中间件,例如Amazon Kinesis、Google Cloud Pub/Sub或Apache Pulsar等。

> > > > 嵌入式引擎

使用Debezium Connector的另一种方法是嵌入式引擎。在这种情况下,Debezium不会通过Kafka Connect运行,而是作为嵌入到Java应用程序中的库运行。这对于在应用程序本身内获取变更事件非常有帮助,,也不用将变更流式传输到Amazon Kinesis等消息中间件上。

3.软件版本

1.jdk1.8.0_101

2.kafka_2.12-3.3.1

3.zookeeper-3.6.4(kafka自带,无需单独部署)

4.kafka-eagle-3.0.2(可选,图形化工具)

5.debezium-1.5.4.Final

软件部署

主要需要部署Kafka,Debezium和Sink工具基于Kafka进行配置实现,。

1.Kafka部署

在进行Kafka环境部署时,需要先部署java环境,本次部署的kafka_2.12-3.3.1版本可以直接基于jdk1.8运行。为了更好的查看和管理Kafka,本次也单独部署了apache eagle图形化工具。

> > > > Java_1.8

1.上传并解压jdk安装包:

Debezium+Kafka实时数据同步验证

2.修改全局变量:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

> > > > Apache Kafka

1.上传安装包,解压:

Debezium+Kafka实时数据同步验证

2.启动zookeeper:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

3.启动Kafaka:

Debezium+Kafka实时数据同步验证

4.测试

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

> > > > Kafka-eagle

1.上传解压:

Debezium+Kafka实时数据同步验证

2.修改环境变量:

Debezium+Kafka实时数据同步验证

3.修改配置文件:

Debezium+Kafka实时数据同步验证

4.启动ke:

Debezium+Kafka实时数据同步验证

5.修改kafka启动脚本:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

2.Debezium

本次Debezium是,所以在部署是需要部署connect服务以及Debezium的实现工具。

> > > > kafka connect服务

1.创建插件目录:

Debezium+Kafka实时数据同步验证

2.上传并解压插件:

Debezium+Kafka实时数据同步验证

3.修改kafka connect配置:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

> > > > debezium mysql connector

1.创建配置文件:

Debezium+Kafka实时数据同步验证

2.配置文件:

Debezium+Kafka实时数据同步验证

参数说明:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

全部参数参考官网说明:

https://debezium.io/documentation/reference/1.2/connectors/mysql.html#mysql-connector-configuration-properties_debezium

> > > > 注册启动Connector

1.启动并注册connect到kafka:

Debezium+Kafka实时数据同步验证

2.通过restful api管理connectors:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

3.在浏览器查看当前活跃的connectors:

Debezium+Kafka实时数据同步验证

4.添加新的debezium mysql connector:

Debezium+Kafka实时数据同步验证

5.删除debezium mysql connector:

Debezium+Kafka实时数据同步验证

2.3JDBC Sink

1.进入kafka插件目录:

Debezium+Kafka实时数据同步验证

2.上传软件包并解压:

Debezium+Kafka实时数据同步验证

3.进入confluentinc-kafka-connect-jdbc-10.6.0的lib目录:

Debezium+Kafka实时数据同步验证

4.上传MySQL驱动:

Debezium+Kafka实时数据同步验证

5.通过API添加sink的connector:

Debezium+Kafka实时数据同步验证

注意事项

在整个部署过程中,需要,涉及多个配置文件和命令的使用,所以在配置过程中要注意配置说明和相关参数,以及在配置文件中对于IP地址的使用。

1.Kafka

1.Kafka运行需要依赖Zookeeper,但在单机部署的情况下,可以直接使用Kafka自带的Zookeeper,配置文件:config/zookeeper.properties,启动脚本:bin/zookeeper-server-start.sh;

2.Kafak默认前台启动,后台启动需要在启动命令上添加-daemon;

3.Kafka-eagle是Kafak的web工具,可不安装,如果按照注意至少保证服务器有1.5G的空闲内存,否则可能出现启动成功,但是无法访问的情况;

4.,默认是指sqlite和mysql,建议使用mysql,在配置文件中配置数据库连接即可,无需提前建立数据库;

5.Kafka-eagle监听Kafka时需要暴露JMX端口,所以需要修改Kafka的启动脚本,并且重启Kafka;

6.由于Kafka暴露JMX端口后,需要2G的JVM,所以如果使用Kafka-eagle并且和kafka在同一台服务器,建议服务器内存4G以上。

2.Debezium

1.Debezium作为Kafka的插件直接部署在Kafka中,Debezium通过实现Kafka的Connector接口实现和Kafka的数据交互,所以需要先,再将Debezium注册到Kafka的connect服务中;

2.第一次注册connector时,可以通过Kafak的bin/connect-standalone.sh脚本进行启动,但是启动后再次注册必须通过connector的REST API,不能再次通过脚本启动(除非先结束进程)。

3.Sink

1.Sink的实现方式和Debezium类似,也是通过connector进行处理,不过配置文件不同;

2.由于本次Sink使用的时confluentinc的Sink实现,和Debezium的类路径不同,所以如果已经开启了Debezium,再次添加Sink时需要重启connect服务以加载confluentinc的实现类,并且需要调整启动命令:

./bin/connect-standalone.sh --daemon config/connect-standalone.properties [SourceConnector.properties SinkConnector.properties]

例如:

./bin/connect-standalone.sh --daemon config/connect-standalone.properties connect-debezium-mysql.properties connect-sink-mysql.properties

3.如果confluentinc的实现类已经加载过,第二次添加Sink时则不需要再重启connect服务,直接使用REST API添加即可;

4.如果是通过REST API操作connector,请求体需要是JSON格式,如果是properties配置文件,;

5.confluentinc的实现类运行时需要依赖MySQL驱动,该驱动需要单独下载,并没有预置在安装包内。

测试要点

整体测试需要包括;需要测试基于Debezium实现数据采集并写入Kafka,基于Sink实现Kafka的数据到下游数据库的写入。

1.Kafka

1.创建topic:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

2.查询topic列表:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

3.查询topic信息:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

4.打开新窗口,创建producer生产消息:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

5.打开新窗口,创建consumer消费消息:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

2.Debezium

1.准备MySQL数据库:

Debezium+Kafka实时数据同步验证

2.准备MySQL数据表:

Debezium+Kafka实时数据同步验证

3.创建Kafka的consumer:

Debezium+Kafka实时数据同步验证

4.插入数据:

5.通过Kafka的consumer监控数据变更:

Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证
Debezium+Kafka实时数据同步验证

3.Sink写入

1.启动后检查源表和目标表数据是否一致:

源表:

Debezium+Kafka实时数据同步验证

目标表:

Debezium+Kafka实时数据同步验证

2.修改源表数据,检查目标表是否同步:

源表:

Debezium+Kafka实时数据同步验证

目标表:

Debezium+Kafka实时数据同步验证

分析总结

本次验证主要是基于Debezium和Kafka验证实时数据采集的过程,后续可以和DAP产品融合,以便POC或实际项目中能更好地实现数仓数据采集和构建,满足实际项目和业务的需要。

1.个人总结

,实现了非常全面的消息、数据的采集、管理,通过本次部署验证,对于Kafka的配置、使用有了一定的了解。目前实时数据采集有很多不同的方案,但通过部署验证的过程,对于实时采集的模式、实现方式等有了更加清晰的认知。

2.架构体系

在实际项目中实时数据采集一般适用于小数据量、高频率的数据变更处理,如一些生产运行数据,而这些数据实时采集后往往是需要做数据展现,所以和大数据、数据分析结合的比较紧密,通过采集实时数据进行数据加工汇总,构建企业数据仓库,从而实现实时的分析展现。

3.产品融合

在目前的产品体系中,和实时数据采集结合比较多的就是DAP数据分析平台和数据中台方案,,为数据构建和数据展现提供更加快速的变化,及时展现企业的运营状态。

,主要满足数仓构建以及数据可视化分析,而数仓建设也在大数据层面不断深化,而实时数据采集也是支持大数据建设的内容之一,结合实时、定时数据采集以及高性能的大数据仓库,不断完善DAP数据分析平台对实际业务场景的支撑,更好地支持实际项目的应用。

本文由@数通畅联原创,欢迎转发,仅供学习交流使用,引用请注明出处!谢谢~

继续阅读