天天看點

kafka Java用戶端之Connect API

kafka Connect 簡單介紹

Kafka Connect 是一個可擴充、可靠的在Kafka和其他系統之間流傳輸的資料工具。它可以通過connectors(連接配接器)簡單、快速的将大集合資料導入和導出kafka。Kafka Connect可以接收整個資料庫或收集來自所有的應用程式的消息到Kafka Topic。使這些資料可用于低延遲流處理。導出可以把topic的資料發送到secondary storage(輔助存儲也叫二級存儲)也可以發送到查詢系統或批處理系統做離線分析。

Connect API實作一個連接配接器(connector),不斷地從一些資料源系統拉取資料到kafka,或從kafka推送到宿系統(sink system)。

Kafka Connect功能包括:

  • Kafka連接配接器通用架構:Kafka Connect 規範了kafka與其他資料系統內建,簡化了connector的開發、部署和管理。
  • 分布式和單機模式 - 擴充到大型支援整個organization(公司、組織)的集中管理服務,也可縮小到開發,測試和小規模生産部署。
  • REST 接口 - 使用REST API來送出并管理Kafka Connect叢集。
  • 自動的offset管理 - 從connector擷取少量的資訊,Kafka Connect來管理offset的送出,是以connector的開發者不需要擔心這個容易出錯的部分。
  • 分布式和預設擴充 - Kafka Connect建立在現有的組管理協定上。更多的工作可以添加擴充到Kafka Connect叢集。
  • 流式/批量內建 - 利用kafka現有的能力,Kafka Connect是一個橋接流和批量資料系統的理想解決方案。

Kafka Connect實際上是Kafka流式計算的一部分,差別于Kafka Stream的是,Kafka Stream在kafka内部處理資料。而Kafka Connect 是将資料導入Kafka或者将資料從Kafka導出,是以Kafka Connect主要用來與其他中間件建立流式通道

Kafka Connect的架構如下圖所示:

kafka Java用戶端之Connect API

現在常用的中間件的connect 都已經被維護好了,是以不用我們自己實作。

Kafka Connect關鍵詞

Connectors

通過管理task來協調資料流的進階抽象

Kafka Connect中的connector定義了資料應該從哪裡複制到哪裡。connector執行個體是一種邏輯作業,負責管理Kafka與另一個系統之間的資料複制。

我們在大多數情況下都是使用一些平台提供的現成的connector。但是,也可以從頭編寫一個新的connector插件。在高層次上,希望編寫新連接配接器插件的開發人員遵循以下工作流:

kafka Java用戶端之Connect API

Task

Task是Connect資料模型中的主要處理資料的角色,也就是真正幹活的。每個connector執行個體協調一組實際複制資料的task。通過允許connector将單個作業分解為多個task,Kafka Connect提供了内置的對并行性和可伸縮資料複制的支援,隻需很少的配置。

這些任務沒有存儲任何狀态。任務狀态存儲在Kafka中的特殊主題config.storage.topic和status.storage.topic中。是以,可以在任何時候啟動、停止或重新啟動任務,以提供彈性的、可伸縮的資料管道。

kafka Java用戶端之Connect API

Workers

Workers是負責管理和執行connector和task的。connector和task是邏輯工作單元,并作為程序運作。

Workers有兩種模式,Standalone(單機)和Distributed(分布式)。

Standalone Workers:

Standalone模式是最簡單的模式,用單一程序負責執行所有connector和task。對于在本地計算機上開發和測試 Kafka Connect 非常有用。它還可用于通常使用單個代理的環境(例如,将 Web 伺服器日志發送到 Kafka)。

Distributed Workers:

Distributed模式為Kafka Connect提供了可擴充性和自動容錯能力。在分布式模式下,你可以使用相同的組啟動許多worker程序。它們自動協調以跨所有可用的worker排程connector和task的執行。

如果你添加一個worker、關閉一個worker或某個worker意外失敗,那麼其餘的worker将檢測到這一點,并自動協調,在可用的worker集重新分發connector和task。

kafka Java用戶端之Connect API

Task Rebalance

當connector首次送出到叢集時,workers會重新平衡叢集中的所有connector及其tasks,以便每個worker的工作量大緻相同。當connector增加或減少它們所需的task數量,或者更改connector的配置時,會重新平衡。

當一個worker失敗時,task在活動的worker之間重新平衡。當一個task失敗時,不會觸發再平衡,因為task失敗被認為是一個例外情況。是以,失敗的task不會被架構自動重新啟動,應該通過REST API重新啟動。

kafka Java用戶端之Connect API

Converters

在向Kafka寫入或從Kafka讀取資料時,Converter是使Kafka Connect支援特定資料格式所必需的。task使用Converters将資料格式從位元組轉換為連接配接内部資料格式,反之亦然。

預設情況下,Confluent Platform 提供以下轉換器:

  • AvroConverter(建議):與Schema Registry一起使用
  • JsonConverter:适合結構資料
  • StringConverter:簡單的字元串格式
  • ByteArrayConverter:提供不進行轉換的“傳遞”選項

Converters與Connectors本身分離,以允許Connectors之間自然地重複使用轉換器。例如,使用相同的 Avro 轉換器,JDBC 源連接配接器可以将 Avro 資料寫入 Kafka,HDFS 接收器連接配接器可以從 Kafka 讀取 Avro 資料。這意味着可以使用相同的轉換器,即使例如,JDBC 源傳回最終作為 Parquet 檔案寫入 HDFS 的轉換器。

AvroConverter處理資料的流程圖:

下圖顯示了在使用 JDBC 源連接配接器從資料庫讀取資料、寫入 Kafka 以及最後使用 HDFS 接收器連接配接器寫入 HDFS 時如何使用轉換器。

kafka Java用戶端之Connect API

Transforms

Connector可以配置Transforms,以便對單個消息進行簡單且輕量的修改。這對于小資料的調整和事件路由十分友善,且可以在connector配置中将多個Transforms連接配接在一起。然而,應用于多個消息的更複雜的Transforms最好使用KSQL和Kafka Stream來實作。

Transforms是一個簡單的函數,輸入一條記錄,并輸出一條修改過的記錄。Kafka Connect提供許多Transforms,它們都執行簡單但有用的修改。可以使用自己的邏輯定制實作轉換接口,将它們打包為Kafka Connect插件,将它們與connector一起使用。

當Transforms與Source Connector一起使用時,Kafka Connect通過第一個Transforms傳遞connector生成的每條源記錄,第一個Transforms對其進行修改并輸出一個新的源記錄。将更新後的源記錄傳遞到鍊中的下一個Transforms,該Transforms再生成一個新的修改後的源記錄。最後更新的源記錄會被轉換為二進制格式寫入到Kafka。Transforms也可以與Sink Connector一起使用。

Mysql 環境準備

使用mysql 來嘗試kafka connect

建立Mysql資料庫

使用Navicat 建立Mysql 資料庫

kafka Java用戶端之Connect API

使用DataGrip 與mysql 建立連接配接

kafka Java用戶端之Connect API

建立資料庫的表

在示範Kakfa Connect的使用之前我們需要先做一些準備,因為依賴一些額外的內建。例如在本文中使用MySQL作為資料源的輸入和輸出,是以首先得在MySQL中建立兩張表(作為Data Source和Data Sink)。建表SQL如下:

CREATE TABLE `users_input` (
  `uuid` INT(12) PRIMARY KEY NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(10) NOT NULL,
  `age` INT(12) NOT NULL
);

CREATE TABLE `users_output` (
  `uuid` INT(12) NOT NULL PRIMARY KEY AUTO_INCREMENT,
  `name` VARCHAR(10) NOT NULL,
  `age` INT(12) NOT NULL
);      
kafka Java用戶端之Connect API

Connect 準備

Kafka Connect中的connector定義了資料應該從哪裡複制到哪裡。connector執行個體是一種邏輯作業,負責管理Kafka與另一個系統之間的資料複制。

去​​confluent網站​​尋找Connector,有許多現成的可供使用

kafka Java用戶端之Connect API

下載下傳JDBC Connector

kafka Java用戶端之Connect API

去​​maven 倉庫​​​去下載下傳mysql-connector的jar包

因為我的mysql 是部署的mysql 8 ,是以我下載下傳mysql 8的連接配接jar包

然後将這兩個包拷貝到伺服器指定檔案夾下面。我這裡拷貝到 我部署kafka的檔案路徑下面

cd /usr/local/kafka
mkdir plugins      

給這個檔案夾賦予權限

chmod -R 777 plugins      

解壓kafka的connect 包

unzip confluentinc-kafka-connect-jdbc-10.3.2.zip      

如果伺服器磁盤空間比較緊張的話,可以删掉壓縮包

rm -rf confluentinc-kafka-connect-jdbc-10.3.2.zip      

将MySQL驅動包移動到kafka-connect-jdbc的lib目錄下

mv mysql-connector-java-8.0.28.jar /usr/local/kafka/plugins/confluentinc-kafka-connect-jdbc-10.3.2/lib/      

Connect包準備好後,我們需要修改一些kafka的配置檔案

進入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash      

cd 到腳本檔案的檔案夾

cd /opt/kafka/config      

這個檔案夾下面的配置檔案有很多,我們這裡隻修改分布式的connect-distributed.properties配置檔案

kafka Java用戶端之Connect API

修改如下配置項:

# 指定rest服務的端口号
rest.port=8083
# 指定Connect插件包的存放路徑  引入外部依賴
plugin.path=/opt/kafka/plugins      

由于rest服務監聽了8083端口号,如果你的伺服器開啟了防火牆就需要使用以下指令開放8083端口,否則外部無法通路,但是這裡我們是使用的docker 部署的kafka服務,容器的端口不用開放,但是要把容器的端口和主控端也就是伺服器的端口對應綁定上,以此通路容器的8083端口

我是用的docker 部署的kafka 服務,是以我将這個配置檔案挂載了出來,友善修改

​kafka的簡單介紹以及docker-compose部署單主機Kafka叢集​​ 在這篇部落格的docker-compose.yml檔案的挂載區域添加了如下一列,将配置檔案挂載到了伺服器主控端,并添加了對應端口8083的映射

- "/usr/local/kafka/kafka-1/conf/connect-distributed.properties:/opt/kafka/config/connect-distributed.properties"      

完成前面的步驟後,我們就可以啟動Kafka Connect了。有兩種啟動方式,分别是:前台啟動和背景啟動,前者用于開發調試,後者則通常用于正式環境。具體指令如下:

docker 部署的kafka 容器,腳本檔案在/opt/kafka/bin 下面

(這裡僅針對wurstmeister/kafka鏡像部署的容器)

# 前台啟動   會列印日志到螢幕上
./connect-distributed.sh /opt/kafka/config/connect-distributed.properties

# 背景啟動
./connect-distributed.sh -daemon /opt/local/kafka/config/connect-distributed.properties      

啟動成功後,使用浏覽器通路

http://{ip}:8084/connector-plugins   根據你的伺服器IP 和設定的端口号來      

正常情況下會傳回一些JSON資料:

kafka Java用戶端之Connect API

REST API

由于Kafka Connect的目的是作為一個服務運作,提供了一個用于管理connector的REST API。預設情況下,此服務的端口是8083,可以自己在上面配置檔案修改。以下是目前支援的REST API:我們可以通過這些API去操作connector

kafka Java用戶端之Connect API

使用浏覽器通路檢視所有的connector

http://{ip}:8084/connectors      

此時傳回的是一個空數組,說明沒有任何的connector:

kafka Java用戶端之Connect API

此時我們可以使用POST方式請求/connectors接口來新增一個connector,linux終端視窗curl指令調用示例如下:

curl -X POST -H 'Content-Type: application/json' -i 'http://IP:8084/connectors' \
--data \
'{"name":"test-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://IP:3306/資料庫名?user=xx&password=xx",
"table.whitelist":"users_input",
"incrementing.column.name": "uuid",
"mode":"incrementing",
"topic.prefix": "test-mysql-"}}'      

參數說明:

  • name:指定新增的connector的名稱
  • config:指定該connector的配置資訊
  • connector.class:指定使用哪個Connector類
  • connection.url:指定MySQL的連接配接url
  • table.whitelist:指定需要加載哪些資料表
  • incrementing.column.name:指定表中自增列的名稱
  • mode:指定connector的模式,這裡為增量模式
  • topic.prefix:Kafka會建立一個Topic,該配置項就是用于指定Topic名稱的字首,字尾為資料表的名稱。例如在本例中将生成的Topic名稱為:test-mysql-users_input

使用postmen送出POST 請求也是一樣的,将上面的json部分copy到下面

kafka Java用戶端之Connect API
kafka Java用戶端之Connect API

删除指定的connector

kafka Java用戶端之Connect API

檢視指定connector的運作狀态

kafka Java用戶端之Connect API

新增connector完成後,我們嘗試往資料表裡添加一些資料,具體的sql如下:

insert into users_input(`name`, `age`) values('xt', 23);
insert into users_input(`name`, `age`) values('xt2', 24);
insert into users_input(`name`, `age`) values('xt3', 25);      

使用DataGrip 來執行腳本檔案

kafka Java用戶端之Connect API

使用kafka-console-consumer.sh腳本指令去拉取test-mysql-users_input中的資料:

kafka-1是我的容器名

./kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test-mysql-users_input --from-beginning      

拉取出來的資料是JSON結構的,其中的payload就是資料表中的資料

kafka Java用戶端之Connect API

能拉取到這樣的資料就代表已經成功将MySQL資料表中的資料傳輸到Kafka Connect Source裡了,也就是完成輸入端的工作了。

現在我們已經能夠通過Kafka Connect将MySQL中的資料寫入到Kafka中了,接下來就是完成輸出端的工作,将Kafka裡的資料輸出到MySQL中。

首先,我們需要調用Rest API新增一個Sink類型的connector。具體請求如下:

{"name":"test-download-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://IP:3306/kafka_connect?user=root&password=XXX",
"topics":"test-mysql-users_input",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"uuid",
"table.name.format": "users_output"}}      

參數說明:

  • name:指定新增的connector的名稱
  • config:指定該connector的配置資訊
  • connector.class:指定使用哪個Connector類
  • connection.url:指定MySQL的連接配接url
  • topics:指定從哪個Topic中讀取資料
  • auto.create:是否自動建立資料表
  • insert.mode:指定寫入模式,upsert表示可以更新及寫入
  • pk.mode:指定主鍵模式,record_value表示從消息的value中擷取資料
  • pk.fields:指定主鍵字段的名稱
  • table.name.format:指定将資料輸出到哪張資料表上
kafka Java用戶端之Connect API

部署完畢之後可以看見users_output上面已經有了和users_input一樣的資料

可以直覺的看到Kafka Connect實際上就做了兩件事情:使用Source Connector從資料源(MySQL)中讀取資料寫入到Kafka Topic中,然後再通過Sink Connector讀取Kafka Topic中的資料輸出到另一端(MySQL)。

References:

  • ​​https://docs.confluent.io/3.0.0/connect/​​
  • ​​https://www.orchome.com/455​​
  • ​​https://docs.confluent.io/platform/current/connect/index.html​​
  • ​​https://blog.51cto.com/zero01/2498682​​