天天看點

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

作者:RestCloud

實時資料同步

随着企業資料不斷增長,如何高效地捕獲、同步和處理資料成為了業務發展的關鍵。在這個數字化時代,CDC技術與Kafka內建為企業提供了一種無縫的資料管理方案,為資料的流動和實時處理打開了全新的大門。

CDC技術與Kafka內建能夠實作快速、可靠的實時資料同步。CDC技術可以捕獲資料庫事務日志中的資料變更,并将其轉化為可靠的資料流。這些資料流通過Kafka的高吞吐量消息隊列進行傳輸,確定資料的實時性和一緻性。無論是從源資料庫到目标資料庫的同步,還是跨不同資料存儲系統的資料傳輸,CDC技術與Kafka內建提供了高效且無縫的解決方案。

可靠的資料傳輸

Kafka作為一個分布式、可擴充的消息隊列系統,提供了高度可靠的資料傳輸機制。借助Kafka的持久性存儲和資料複制機制,資料不會丢失或損壞。即使在高并發的情況下,Kafka內建能夠保證資料的完整性和可靠性。這為企業提供了強大的資料傳輸基礎,確定資料在各個環節的安全傳輸。

靈活的資料處理

CDC技術與Kafka內建不僅提供了實時資料同步,還為企業提供了靈活的資料處理能力。Kafka的消息隊列和流處理特性使得企業可以在資料傳輸的同時進行實時的資料處理和分析。借助Kafka的消費者應用程式,企業可以對資料流進行轉換、過濾、聚合等操作,實作實時資料的清洗、加工和分析。這種實時資料處理能力為企業提供了即時的洞察力,幫助其做出快速而準确的決策。

解耦資料系統

CDC技術與Kafka內建還能幫助企業解耦資料系統。通過将CDC技術與Kafka作為中間層,不同的資料源和目标系統可以獨立操作,彼此之間解除了緊耦合的依賴關系。這種解耦帶來了極大的靈活性,使得企業能夠更加容易地添加、移除或更新資料源和目标系統,而無需對整個資料流程進行重構。

CDC技術與Kafka內建為企業帶來了資料管理的全新體驗。它提供了高效、可靠的資料同步和實時處理,幫助企業實作資料驅動的成功。無論是資料同步、實時處理還是資料系統的解耦,CDC技術與Kafka內建為企業提供了強大而靈活的解決方案。

主流免費CDC工具介紹

介紹兩款能夠快速且免費實作CDC技術與Kafka內建的主流工具:Flink CDC和ETLCloud CDC。

測試前的環境準備:JDK8以上、Mysql資料庫(開啟BinLog日志)、kafka

Flink CDC安裝使用步驟:

下載下傳安裝包

進入 Flink 官網,下載下傳 1.13.3 版本安裝包 flink-1.13.3-bin-scala_2.11.tgz。(Flink1.13.3支援flink cdc2.x版本,為相容flink cdc)

解壓

在 伺服器上建立安裝目錄/home/flink,将 flink 安裝包放在該目錄下,并執行解壓指令,解壓至目前目錄。tar -zxvf flink-1.13.3-bin-scala_2.11.tgz

啟動

進入解壓後的flink/lib目錄,上傳mysql和sql-connector驅動包。

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

進入flink/bin目錄,執行啟動指令:./start-cluster.sh

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

進入Flink可視化界面檢視http://ip:8081

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

測試

下面我們來建立一個maven工程做CDC資料監聽的同步測試。

POM依賴

<!-- Flink CDC -->

<dependency>

<groupId>com.ververica</groupId>

<artifactId>flink-connector-mysql-cdc</artifactId>

<version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>1.12.0</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.12</artifactId>

<version>1.12.0</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_2.12</artifactId>

<version>1.12.0</version>

</dependency>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.49</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_2.12</artifactId>

<version>1.12.0</version>

</dependency>

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.75</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_2.11</artifactId>

<version>1.12.0</version>

</dependency>

建立Flink_CDC2Kafka類

import com.ververica.cdc.connectors.mysql.MySqlSource;

import com.ververica.cdc.connectors.mysql.table.StartupOptions;

import com.ververica.cdc.debezium.DebeziumSourceFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class Flink_CDC2Kafka {

public static void main(String[] args) throws Exception {

//1.擷取執行環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

//1.1 設定 CK&狀态後端

//略

//2.通過 FlinkCDC 建構 SourceFunction 并讀取資料

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()

.hostname("ip") //資料庫IP

.port(3306) //資料庫端口

.username("admin") //資料庫使用者名

.password("pass") //資料庫密碼

.databaseList("test") //這個注釋,就是多庫同步

.tableList("test.admin") //這個注釋,就是多表同步

.deserializer(new CustomerDeserialization()) //這裡需要自定義序列化格式

// .deserializer(new StringDebeziumDeserializationSchema()) //預設是這個序列化格式

.startupOptions(StartupOptions.latest())

.build();

DataStreamSource<String> streamSource = env.addSource(sourceFunction);

//3.列印資料并将資料寫入 Kafka

streamSource.print();

String sinkTopic = "test";

streamSource.addSink(getKafkaProducer("ip:9092",sinkTopic));

//4.啟動任務

env.execute("FlinkCDC");

}

//kafka 生産者

public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {

return new FlinkKafkaProducer<String>(brokers,

topic,

new SimpleStringSchema());

}

}

自定義序列化類

import com.alibaba.fastjson.JSONObject;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

import io.debezium.data.Envelope;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.util.Collector;

import org.apache.kafka.connect.data.Field;

import org.apache.kafka.connect.data.Schema;

import org.apache.kafka.connect.data.Struct;

import org.apache.kafka.connect.source.SourceRecord;

import java.util.ArrayList;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {

@Override

public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

//1.建立 JSON 對象用于存儲最終資料

JSONObject result = new JSONObject();

//2.擷取庫名&表名放入 source

String topic = sourceRecord.topic();

String[] fields = topic.split("\\.");

String database = fields[1];

String tableName = fields[2];

JSONObject source = new JSONObject();

source.put("database",database);

source.put("table",tableName);

Struct value = (Struct) sourceRecord.value();

//3.擷取"before"資料

Struct before = value.getStruct("before");

JSONObject beforeJson = new JSONObject();

if (before != null) {

Schema beforeSchema = before.schema();

List<Field> beforeFields = beforeSchema.fields();

for (Field field : beforeFields) {

Object beforeValue = before.get(field);

beforeJson.put(field.name(), beforeValue);

}

}

//4.擷取"after"資料

Struct after = value.getStruct("after");

JSONObject afterJson = new JSONObject();

if (after != null) {

Schema afterSchema = after.schema();

List<Field> afterFields = afterSchema.fields();

for (Field field : afterFields) {

Object afterValue = after.get(field);

afterJson.put(field.name(), afterValue);

}

}

//5.擷取操作類型 CREATE UPDATE DELETE 進行符合 Debezium-op 的字母

Envelope.Operation operation = Envelope.operationFor(sourceRecord);

String type = operation.toString().toLowerCase();

if ("insert".equals(type)) {

type = "c";

}

if ("update".equals(type)) {

type = "u";

}

if ("delete".equals(type)) {

type = "d";

}

if ("create".equals(type)) {

type = "c";

}

//6.将字段寫入 JSON 對象

result.put("source", source);

result.put("before", beforeJson);

result.put("after", afterJson);

result.put("op", type);

//7.輸出資料

collector.collect(result.toJSONString());

}

@Override

public TypeInformation<String> getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

開啟CDC監聽

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

Mysql中新增一條人員資料

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

控制台捕獲到增量資料

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

增量資料也成功推送到kafka中

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

至此通過Flink CDC監聽資料庫增量資料推送到kafka的過程已經完成,可以看到整個過程需要一些編碼能力,對于業務人員的使用比較痛苦。

下面我們來介紹ETLCloud這款産品如何通過可視化配置,快速實作上述的場景内容。

ETL CDC安裝使用步驟

下載下傳安裝包

ETLCloud提供了一鍵快捷部署包,隻需運作啟動腳本即可完成安裝産品部署。部署包下載下傳可以登入ETLCloud官網自行下載下傳。

安裝

官網下載下傳linux一鍵部署包,把一鍵部署包放到一個目錄下解壓并進入該目錄。

對腳本檔案進行賦權

chmod +x restcloud_install.sh

執行腳本

./restcloud_install.sh

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

等待tomcat啟動,當出現這個界面,則restcloud證明啟動成功

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

資料源配置

新增MySql資料源資訊

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

新增Kafka資料源資訊

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

測試資料源

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

監聽器配置

新增資料庫監聽器

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

監聽器配置

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

接收端配置(資料傳輸類型選擇kafka)

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

進階配置(預設參數)

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

啟動監聽

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

監聽成功

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

測試

打開Navicat可視化工具新增并修改一條人員資訊

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

實時資料中可以動态捕捉實時傳輸資料

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

Kafka中檢視新增資料

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

Kafka中檢視修改資料

實時資料內建的完美搭檔:CDC技術與Kafka內建的資料傳輸方案

寫在最後

上面我們通過兩個CDC工具均實作了實時資料同步到kafka的功能,但通過對比Flink CDC和ETLCloud CDC,可以看出ETLCloud CDC提供了可視化配置的方式,使配置過程更加簡單快捷,不需要編碼能力。而Flink CDC則需要進行編碼,對于業務人員可能會有一定的學習成本。

無論選擇哪種工具,都可以實作CDC技術與Kafka內建,實時捕獲資料庫的增量資料變化,提供了友善和高效的資料同步和傳輸方法。