天天看點

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

Kafka-Connect-JDBC-Sink池連接配接器實戰

功能概述

通過kafka-connect,實作mysql資料自動同步,以下為資料同步流程示意圖:

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

1、建立mysql資料庫

建立一個資料庫寫入使用者(sink),用于讀取資料;

使用root操作,進行如下操作
-- 建立資料庫
create database test_sink;
-- 建立隻讀使用者
create user 'sink'@'127.0.0.1' identified by '123456';
-- 賦予全部權限
grant all on test_sink.* to 'sink'@'127.0.0.1';
-- 重新整理權限
flush privileges;

           

使用sink登入mysql,驗證是否可以正常通路資料庫:

mysql -u sink -p -h 127.0.0.1 
           

2、Mysql-connect-sink池端連接配接器配置

檔案:connect-mysql-sink.properties

name=connect-mysql-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=10

# topic名稱清單
topics=sys_config

# 配置jdbc連接配接
connection.url=jdbc:mysql://127.0.0.1:3306/test_sink?useUnicode=true&characterEncoding=utf8&user=sink&password=123456&serverTimezone=Asia/Shanghai
# 自動建立表
auto.create=true

# 主鍵
pk.mode=record_value
pk.fields=cfg_id

# 寫入模式
insert.mode=upsert
           

3、啟動connect

啟動腳本檔案:8.1-kafka-connect-jdbc.bat

@echo off 
rem 啟動8.1-connect-jdbc
title connect-jdbc-[%date% %time%]

rem 設定connect-jdbc路徑
set CLASSPATH=%cd%\plugins\kafka-connect-jdbc\*

rem connect 連接配接mysql
set configs=config/connect-standalone.properties
set configs=%configs% config/connect-mysql-whitelist-timestamp-source.properties
set configs=%configs% config/connect-mysql-sink.properties
bin\windows\connect-standalone.bat %configs%

pause
           

4、檢視連接配接器插件清單

http://127.0.0.1:8083/connector-plugins

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

5、資料寫入測試

由于昨天已經在源庫中建立表,并寫入資料,首次連接配接mysql目标端庫時,發現表結構,資料已經自動同步過來,但是有如下差異:

5.1、表名稱不同:

如果源端topic.prefix配置為:mysql-,将導緻目标端自動建立的表名與源端不一緻;

5.2、表結構不一緻

目标庫的表結構自動建立,但是不包括預設值;

源表結構:

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

目标表結構:

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

5.3、插入、更新的資料可以正常同步

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

5.4、删除的資料無法同步

但是删除的資料,無法正常同步到目标端。

Kafka-Connect-JDBC-Sink池連接配接器實戰Kafka-Connect-JDBC-Sink池連接配接器實戰

參考:

https://docs.confluent.io/5.4.0/connect/kafka-connect-jdbc/sink-connector/index.html

繼續閱讀