1.1 Kafka connect的簡單介紹
1.1.1 Kafka connect是什麼?
Kafaka connect 是一種用于在Kafka和其他系統之間可擴充的、可靠的流式傳輸資料的工具。它使得能夠快速定義将大量資料集合移入和移出Kafka的連接配接器變得簡單。Kafka Connect可以從資料庫或應用程式伺服器收集資料到Kafka topic,使資料可用于低延遲的流處理。導出作業可以将資料從Kafka topic傳輸到二次存儲和查詢系統,或者傳遞到批處理系統以進行離線分析。
1.1.2 Kafka connect的特性
Kafka connector 是一個通用架構,提供統一的內建API;
支援分布式模式和單機模式;
REST 接口,用來檢視和管理Kafka connectors;
自動化的offset管理,開發人員不必擔心錯誤處理的影響;
分布式、可擴充;
流/批處理內建;
Kafka Connnect有兩個核心概念:Source和Sink。 Source負責導入資料到Kafka,Sink負責從Kafka導出資料,它們都被稱為Connector。
1.1.3 Kafaka connect的核心元件
Source:負責将外部資料寫入到kafka的topic中;
Sink:負責從kafka中讀取資料到自己需要的地方去,比如讀取到HDFS,hbase等;可以接收資料,也可以接收模式資訊
Connectors :通過管理任務來協調資料流的進階抽象;
Tasks:資料寫入kafka和從kafka中讀出資料的具體實作,source和sink使用時都需要Task;
Workers:運作connectors和tasks的程序;
Converters:kafka connect轉換器提供了一種機制,用于将資料從kafka connect使用的内部資料類型轉換為表示為Avro、Protobuf或JSON模式的資料類型。
Transforms:一種輕量級資料調整的工具;
1.1.4 Kafaka connect的工作模式
standalone:該模式下所有的worker都在一個獨立的程序中完成;
distributed:該模式具有高擴充性,以及提供自動容錯機制。在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者挂了一個worker,其他的worker會檢測到然後在重新配置設定connector和task。connector負責獲得所要連接配接的系統的資訊,并不進行資料拷貝,task進行資料拷貝。
1.1.5 Converters 轉換器的類型
不通類型的配置參考連接配接:
https://docs.confluent.io/current/connect/userguide.html#connect-configuring-converters
Converters 轉換器使用不帶模式系統資料庫的json模式的配置:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter.schemas.enable,value.converter.schemas.enable都為true時,鍵或值不會被視為普通JSON,而是被視為包含内部架構和資料的複合JSON對象。
當對源連接配接器啟用這些功能時,模式和資料都在複合JSON對象中。
key.converter.schemas.enable,value.converter.schemas.enable都為false時,這也是預設值,隻傳遞資料。
1.1.6 Converters 轉換器如何使用Schema Registry模式系統資料庫?
Kafka Connect and Schema Registry內建以從連接配接器捕獲模式資訊。Kafka Connect Converters提供了一種機制,用于将資料從kafka connect使用的内部資料類型轉換為表示為Avro、Protobuf或JSON模式的資料類型。AvroConverter、ProtobufConverter、JsonSchemaConverter自動注冊由source connectors生成的模式。Sink Connectors 可以接收資料,也可以接收模式資訊,這就允許Sink Connectors 可以知道資料的結構,以提供額外的功能,比如維護資料庫表結構或建立搜尋索引。
Kafka Connect with Schema Registry的使用,必須在連接配接器配置或者worker的配置檔案中指定參數:key.converter和value.converter;還必須提供Schema Registry URL的配置資訊。
Avro、Protobuf、JSON Schema的配置參考連接配接:
https://docs.confluent.io/current/schema-registry/connect.html#schemaregistry-kafka-connect
1.1.7 在哪裡指定轉換器的配置以及如何繼承屬性?
可以在連接配接器中配置也可以在worker中配置,但是需要注意如下情況:
1、如果隻在worker中配置了轉換器的參數資訊,則所有connect都将繼承在worker中定義的轉換器配置。
2、如果在connect和worker中都配置了轉換器的參數資訊,則connect中的配置會覆寫worker中的配置。
3、不管在connect還是worker中配置,必須始終包括converter and the Schema Registry URL的參數配置,否則将失敗;
JSON Schema的配置示例:
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
Avro 配置示例:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
1.2 connector單機版部署
1.2.1下載下傳confluent-5.5.0
confluent-5.5.0有社群版和企業版,企業版需要licences。我們使用的是社群版,下載下傳之後解壓檔案到home目錄下。
1.2.2下載下傳所需要的的jar包
需要的jar包有kafka-connect-jdbc-5.5.0.jar、ojdbc6.jar。如果下載下傳了confluent-5.5.0,在confluent的share目錄中會有kafka-connect-jdbc-5.5.0.jar包。
1.2.3調整jar包位置
将第(2)步下載下傳的jar包拷貝到/home/confluent-5.5.0/share/java/kafka下面。

1.2.4修改kafka配置檔案的plugin.path
修改kafka安裝包下的config中的connect-standalone.properties檔案中的plugin.path
Vi /home/confluent-5.5.0/etc/kafka/connect-standalone.properties
修改plugin.path=/home/confluent-5.5.0/share/java/
目的是在啟動connector時找到所需要的各種jar包。
1.2.5 修改connector的配置檔案
在 /home/confluent-5.5.0/etc/kafka-connect-jdbc/目錄下建立connect配置檔案,并編輯如下内容:
/home/confluent-5.5.0/etc/kafka-connect-jdbc/OracleSourceConnector.properties
name=test-oracle-kafka-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.password=123456
connection.url=jdbc:oracle:thin:@192.168.1.3:1521:testdb
connection.user=test
table.whitelist=test.oracle_test
connection.backoff.ms=30000
topic.prefix=topic2 ##topic字首
#mode=bulk
mode=timestamp ##指定模式
#incrementing.column.name=id
timestamp.column.name=CR_DATE
#table.poll.interval.ms = 1000
poll.interval.ms=5000
validate.non.null=false
#timestamp.initial=1593532297000 指定從某個時間戳開始
db.timezone=Asia/Shanghai ##指定時區
1.2.6啟動connector程序(會自動建立topic)
/home/confluent-5.5.0/bin/connect-standalone /home/confluent-5.5.0/etc/kafka/connect-standalone.properties /home/confluent-5.5.0/etc/kafka-connect-jdbc/OracleSourceConnector.properties
1.2.7檢視建立topic指令
--檢視topic,建立的topic名稱為定義的topic字首+表名
/home/confluent-5.5.0/bin/kafka-topics --list --bootstrap-server localhost:9092
1.2.8啟動consumer
/home/confluent-5.5.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2oracle_test
1.2.9 Jdbc連接配接器從指定時間戳位置捕獲資料
如果想指定從某個時間戳開始推送資料,有兩種設定方法:
(1)worker程序的配置檔案/home/confluent-5.5.0/etc/kafka/connect-standalone.properties中記錄了偏移量和時間戳資訊,可以修改該檔案的時間戳。
(2)先清空啟動worker時生成的offset檔案内容,在connector配置檔案中添加參數timestamp.initial=時間戳方式指定。
注意:如果不先清空啟動worker時生成的offset檔案内容,直接修改參數timestamp.initial,實驗證明沒有效果。
1.2.10 源端資料庫系統和JDBC連接配接器時區
需要注意的是資料庫系統時區和connector的jdbc的時區應該是一緻的。