天天看點

confluent kafka-connect-jdbc

1.1 Kafka connect的簡單介紹

1.1.1 Kafka connect是什麼?

Kafaka connect 是一種用于在Kafka和其他系統之間可擴充的、可靠的流式傳輸資料的工具。它使得能夠快速定義将大量資料集合移入和移出Kafka的連接配接器變得簡單。Kafka Connect可以從資料庫或應用程式伺服器收集資料到Kafka topic,使資料可用于低延遲的流處理。導出作業可以将資料從Kafka topic傳輸到二次存儲和查詢系統,或者傳遞到批處理系統以進行離線分析。

1.1.2 Kafka connect的特性

Kafka connector 是一個通用架構,提供統一的內建API;

支援分布式模式和單機模式;

REST 接口,用來檢視和管理Kafka connectors;

自動化的offset管理,開發人員不必擔心錯誤處理的影響;

分布式、可擴充;

流/批處理內建;

Kafka Connnect有兩個核心概念:Source和Sink。 Source負責導入資料到Kafka,Sink負責從Kafka導出資料,它們都被稱為Connector。

1.1.3 Kafaka connect的核心元件

Source:負責将外部資料寫入到kafka的topic中;

Sink:負責從kafka中讀取資料到自己需要的地方去,比如讀取到HDFS,hbase等;可以接收資料,也可以接收模式資訊

Connectors :通過管理任務來協調資料流的進階抽象;

Tasks:資料寫入kafka和從kafka中讀出資料的具體實作,source和sink使用時都需要Task;

Workers:運作connectors和tasks的程序;

Converters:kafka connect轉換器提供了一種機制,用于将資料從kafka connect使用的内部資料類型轉換為表示為Avro、Protobuf或JSON模式的資料類型。

Transforms:一種輕量級資料調整的工具;

1.1.4 Kafaka connect的工作模式

standalone:該模式下所有的worker都在一個獨立的程序中完成;

distributed:該模式具有高擴充性,以及提供自動容錯機制。在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者挂了一個worker,其他的worker會檢測到然後在重新配置設定connector和task。connector負責獲得所要連接配接的系統的資訊,并不進行資料拷貝,task進行資料拷貝。

1.1.5 Converters 轉換器的類型

不通類型的配置參考連接配接:

https://docs.confluent.io/current/connect/userguide.html#connect-configuring-converters

Converters 轉換器使用不帶模式系統資料庫的json模式的配置:

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

key.converter.schemas.enable,value.converter.schemas.enable都為true時,鍵或值不會被視為普通JSON,而是被視為包含内部架構和資料的複合JSON對象。

當對源連接配接器啟用這些功能時,模式和資料都在複合JSON對象中。

key.converter.schemas.enable,value.converter.schemas.enable都為false時,這也是預設值,隻傳遞資料。

1.1.6 Converters 轉換器如何使用Schema Registry模式系統資料庫?

Kafka Connect and Schema Registry內建以從連接配接器捕獲模式資訊。Kafka Connect Converters提供了一種機制,用于将資料從kafka connect使用的内部資料類型轉換為表示為Avro、Protobuf或JSON模式的資料類型。AvroConverter、ProtobufConverter、JsonSchemaConverter自動注冊由source connectors生成的模式。Sink Connectors 可以接收資料,也可以接收模式資訊,這就允許Sink Connectors 可以知道資料的結構,以提供額外的功能,比如維護資料庫表結構或建立搜尋索引。

Kafka Connect with Schema Registry的使用,必須在連接配接器配置或者worker的配置檔案中指定參數:key.converter和value.converter;還必須提供Schema Registry URL的配置資訊。

Avro、Protobuf、JSON Schema的配置參考連接配接:

https://docs.confluent.io/current/schema-registry/connect.html#schemaregistry-kafka-connect

1.1.7 在哪裡指定轉換器的配置以及如何繼承屬性?

可以在連接配接器中配置也可以在worker中配置,但是需要注意如下情況:

1、如果隻在worker中配置了轉換器的參數資訊,則所有connect都将繼承在worker中定義的轉換器配置。

2、如果在connect和worker中都配置了轉換器的參數資訊,則connect中的配置會覆寫worker中的配置。

3、不管在connect還是worker中配置,必須始終包括converter and the Schema Registry URL的參數配置,否則将失敗;

JSON Schema的配置示例:

key.converter=io.confluent.connect.json.JsonSchemaConverter

key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.json.JsonSchemaConverter

value.converter.schema.registry.url=http://localhost:8081

Avro 配置示例:

key.converter=io.confluent.connect.avro.AvroConverter

key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://localhost:8081

1.2 connector單機版部署

1.2.1下載下傳confluent-5.5.0

confluent-5.5.0有社群版和企業版,企業版需要licences。我們使用的是社群版,下載下傳之後解壓檔案到home目錄下。

1.2.2下載下傳所需要的的jar包

需要的jar包有kafka-connect-jdbc-5.5.0.jar、ojdbc6.jar。如果下載下傳了confluent-5.5.0,在confluent的share目錄中會有kafka-connect-jdbc-5.5.0.jar包。

1.2.3調整jar包位置

将第(2)步下載下傳的jar包拷貝到/home/confluent-5.5.0/share/java/kafka下面。

confluent kafka-connect-jdbc

1.2.4修改kafka配置檔案的plugin.path

修改kafka安裝包下的config中的connect-standalone.properties檔案中的plugin.path

Vi /home/confluent-5.5.0/etc/kafka/connect-standalone.properties

修改plugin.path=/home/confluent-5.5.0/share/java/

目的是在啟動connector時找到所需要的各種jar包。

1.2.5 修改connector的配置檔案

在 /home/confluent-5.5.0/etc/kafka-connect-jdbc/目錄下建立connect配置檔案,并編輯如下内容:

/home/confluent-5.5.0/etc/kafka-connect-jdbc/OracleSourceConnector.properties

name=test-oracle-kafka-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.password=123456
connection.url=jdbc:oracle:thin:@192.168.1.3:1521:testdb
connection.user=test
table.whitelist=test.oracle_test
connection.backoff.ms=30000
topic.prefix=topic2 ##topic字首
#mode=bulk
mode=timestamp ##指定模式
#incrementing.column.name=id
timestamp.column.name=CR_DATE
#table.poll.interval.ms = 1000
poll.interval.ms=5000
validate.non.null=false
#timestamp.initial=1593532297000 指定從某個時間戳開始
db.timezone=Asia/Shanghai ##指定時區
           

1.2.6啟動connector程序(會自動建立topic)

/home/confluent-5.5.0/bin/connect-standalone /home/confluent-5.5.0/etc/kafka/connect-standalone.properties /home/confluent-5.5.0/etc/kafka-connect-jdbc/OracleSourceConnector.properties

1.2.7檢視建立topic指令

--檢視topic,建立的topic名稱為定義的topic字首+表名

/home/confluent-5.5.0/bin/kafka-topics --list --bootstrap-server localhost:9092

1.2.8啟動consumer

/home/confluent-5.5.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2oracle_test

1.2.9 Jdbc連接配接器從指定時間戳位置捕獲資料

如果想指定從某個時間戳開始推送資料,有兩種設定方法:

(1)worker程序的配置檔案/home/confluent-5.5.0/etc/kafka/connect-standalone.properties中記錄了偏移量和時間戳資訊,可以修改該檔案的時間戳。

(2)先清空啟動worker時生成的offset檔案内容,在connector配置檔案中添加參數timestamp.initial=時間戳方式指定。

注意:如果不先清空啟動worker時生成的offset檔案内容,直接修改參數timestamp.initial,實驗證明沒有效果。

1.2.10 源端資料庫系統和JDBC連接配接器時區

需要注意的是資料庫系統時區和connector的jdbc的時區應該是一緻的。