實時資料同步
随着企業資料不斷增長,如何高效地捕獲、同步和處理資料成為了業務發展的關鍵。在這個數字化時代,CDC技術與Kafka內建為企業提供了一種無縫的資料管理方案,為資料的流動和實時處理打開了全新的大門。
CDC技術與Kafka內建能夠實作快速、可靠的實時資料同步。CDC技術可以捕獲資料庫事務日志中的資料變更,并将其轉化為可靠的資料流。這些資料流通過Kafka的高吞吐量消息隊列進行傳輸,確定資料的實時性和一緻性。無論是從源資料庫到目标資料庫的同步,還是跨不同資料存儲系統的資料傳輸,CDC技術與Kafka內建提供了高效且無縫的解決方案。
可靠的資料傳輸
Kafka作為一個分布式、可擴充的消息隊列系統,提供了高度可靠的資料傳輸機制。借助Kafka的持久性存儲和資料複制機制,資料不會丢失或損壞。即使在高并發的情況下,Kafka內建能夠保證資料的完整性和可靠性。這為企業提供了強大的資料傳輸基礎,確定資料在各個環節的安全傳輸。
靈活的資料處理
CDC技術與Kafka內建不僅提供了實時資料同步,還為企業提供了靈活的資料處理能力。Kafka的消息隊列和流處理特性使得企業可以在資料傳輸的同時進行實時的資料處理和分析。借助Kafka的消費者應用程式,企業可以對資料流進行轉換、過濾、聚合等操作,實作實時資料的清洗、加工和分析。這種實時資料處理能力為企業提供了即時的洞察力,幫助其做出快速而準确的決策。
解耦資料系統
CDC技術與Kafka內建還能幫助企業解耦資料系統。通過将CDC技術與Kafka作為中間層,不同的資料源和目标系統可以獨立操作,彼此之間解除了緊耦合的依賴關系。這種解耦帶來了極大的靈活性,使得企業能夠更加容易地添加、移除或更新資料源和目标系統,而無需對整個資料流程進行重構。
CDC技術與Kafka內建為企業帶來了資料管理的全新體驗。它提供了高效、可靠的資料同步和實時處理,幫助企業實作資料驅動的成功。無論是資料同步、實時處理還是資料系統的解耦,CDC技術與Kafka內建為企業提供了強大而靈活的解決方案。
主流免費CDC工具介紹
介紹兩款能夠快速且免費實作CDC技術與Kafka內建的主流工具:Flink CDC和ETLCloud CDC。
測試前的環境準備:JDK8以上、Mysql資料庫(開啟BinLog日志)、kafka
Flink CDC安裝使用步驟:
下載下傳安裝包
進入 Flink 官網,下載下傳 1.13.3 版本安裝包 flink-1.13.3-bin-scala_2.11.tgz。(Flink1.13.3支援flink cdc2.x版本,為相容flink cdc)
解壓
在 伺服器上建立安裝目錄/home/flink,将 flink 安裝包放在該目錄下,并執行解壓指令,解壓至目前目錄。tar -zxvf flink-1.13.3-bin-scala_2.11.tgz
啟動
進入解壓後的flink/lib目錄,上傳mysql和sql-connector驅動包。
進入flink/bin目錄,執行啟動指令:./start-cluster.sh
進入Flink可視化界面檢視http://ip:8081
測試
下面我們來建立一個maven工程做CDC資料監聽的同步測試。
POM依賴
<!-- Flink CDC -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
建立Flink_CDC2Kafka類
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Flink_CDC2Kafka {
public static void main(String[] args) throws Exception {
//1.擷取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 設定 CK&狀态後端
//略
//2.通過 FlinkCDC 建構 SourceFunction 并讀取資料
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("ip") //資料庫IP
.port(3306) //資料庫端口
.username("admin") //資料庫使用者名
.password("pass") //資料庫密碼
.databaseList("test") //這個注釋,就是多庫同步
.tableList("test.admin") //這個注釋,就是多表同步
.deserializer(new CustomerDeserialization()) //這裡需要自定義序列化格式
// .deserializer(new StringDebeziumDeserializationSchema()) //預設是這個序列化格式
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.列印資料并将資料寫入 Kafka
streamSource.print();
String sinkTopic = "test";
streamSource.addSink(getKafkaProducer("ip:9092",sinkTopic));
//4.啟動任務
env.execute("FlinkCDC");
}
//kafka 生産者
public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
}
自定義序列化類
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.建立 JSON 對象用于存儲最終資料
JSONObject result = new JSONObject();
//2.擷取庫名&表名放入 source
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
JSONObject source = new JSONObject();
source.put("database",database);
source.put("table",tableName);
Struct value = (Struct) sourceRecord.value();
//3.擷取"before"資料
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.擷取"after"資料
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.擷取操作類型 CREATE UPDATE DELETE 進行符合 Debezium-op 的字母
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("insert".equals(type)) {
type = "c";
}
if ("update".equals(type)) {
type = "u";
}
if ("delete".equals(type)) {
type = "d";
}
if ("create".equals(type)) {
type = "c";
}
//6.将字段寫入 JSON 對象
result.put("source", source);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("op", type);
//7.輸出資料
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
開啟CDC監聽
Mysql中新增一條人員資料
控制台捕獲到增量資料
增量資料也成功推送到kafka中
至此通過Flink CDC監聽資料庫增量資料推送到kafka的過程已經完成,可以看到整個過程需要一些編碼能力,對于業務人員的使用比較痛苦。
下面我們來介紹ETLCloud這款産品如何通過可視化配置,快速實作上述的場景内容。
ETL CDC安裝使用步驟
下載下傳安裝包
ETLCloud提供了一鍵快捷部署包,隻需運作啟動腳本即可完成安裝産品部署。部署包下載下傳可以登入ETLCloud官網自行下載下傳。
安裝
官網下載下傳linux一鍵部署包,把一鍵部署包放到一個目錄下解壓并進入該目錄。
對腳本檔案進行賦權
chmod +x restcloud_install.sh
執行腳本
./restcloud_install.sh
等待tomcat啟動,當出現這個界面,則restcloud證明啟動成功
資料源配置
新增MySql資料源資訊
新增Kafka資料源資訊
測試資料源
監聽器配置
新增資料庫監聽器
監聽器配置
接收端配置(資料傳輸類型選擇kafka)
進階配置(預設參數)
啟動監聽
監聽成功
測試
打開Navicat可視化工具新增并修改一條人員資訊
實時資料中可以動态捕捉實時傳輸資料
Kafka中檢視新增資料
Kafka中檢視修改資料
寫在最後
上面我們通過兩個CDC工具均實作了實時資料同步到kafka的功能,但通過對比Flink CDC和ETLCloud CDC,可以看出ETLCloud CDC提供了可視化配置的方式,使配置過程更加簡單快捷,不需要編碼能力。而Flink CDC則需要進行編碼,對于業務人員可能會有一定的學習成本。
無論選擇哪種工具,都可以實作CDC技術與Kafka內建,實時捕獲資料庫的增量資料變化,提供了友善和高效的資料同步和傳輸方法。