
Kafka 連接配接器介紹
Kafka 連接配接器通常用來建構資料管道,一般有兩種使用場景:
- 開始和結束的端點:例如,将 Kafka 中的資料導出到 HBase 資料庫,或者把 Oracle 資料庫中的資料導入 Kafka 中。
- 資料傳輸的中間媒體:例如,為了把海量的日志資料存儲到 Elasticsearch 中,可以先把這些日志資料傳輸到 Kafka 中,然後再從 Kafka 中将這些資料導入到 Elasticsearch 中進行存儲。Kafka 連接配接器可以作為資料管道各個階段的緩沖區,将消費者程式和生産者程式有效地進行解耦。
Kafka 連接配接器分為兩種:
- Source 連接配接器:負責将資料導入 Kafka。
- Sink 連接配接器:負責将資料從 Kafka 系統中導出。
連接配接器作為 Kafka 的一部分,是随着 Kafka 系統一起釋出的,無須獨立安裝。
Kafka 連接配接器特性
Kafka 連接配接器包含以下特性:
- 1.是一種處理資料的通用架構,Kafka 連接配接器指定了一種标準,用來限制 Kafka 與其他系統的內建,簡化了 Kafka 連接配接器的開發、部署和管理過程。
- 2.提供單機模式和分布式模式:Kafka 連接配接器支援兩種模式,既能擴充到支援大型叢集,也可以縮小到開發和測試小規模的叢集。
- 3.提供 REST 接口:使用 REST API 來送出請求并管理 Kafka 連接配接器。
- 4.自動管理偏移量:Kafka 連接配接器可以自動管理偏移量。
- 5.分布式和可擴充:Kafka 連接配接器建立在現有的組管理協定上,可以通過添加更多的連接配接器執行個體來實作水準擴充,實作分布式服務。
- 6.資料流和批量內建:利用 Kafka 已有的能力,Kafka 連接配接器是橋接資料流和批處理系統的一種理想的解決方案。
Kafka 連接配接器核心概念
- 連接配接器執行個體:連接配接器執行個體決定了消息資料的流向,即消息從何處複制,以及将複制的消息寫入到何處。連接配接器執行個體負責 Kafka 與其他系統之間的邏輯處理,連接配接器執行個體通常以 JAR 包形式存在,通過實作 Kafka 系統應用接口來完成。
- 任務數:在分布式模式下,每一個連接配接器執行個體可以将一個作業切分成多個任務(Task),然後再将任務分發到各個事件線程(Worker)中去執行。任務不會儲存目前的狀态資訊,通常由特定的 Kafka Topic 來儲存,例如,指定具體屬性
和offset.storage.topic
的值來儲存。status.storage.topic
- 事件線程:在 Kafka 中,連接配接器執行個體和任務數都是邏輯層面的,需要由具體的線程來執行,事件線程包含兩種模式--單機模式和分布式模式。
- 轉換器:轉換器能将位元組資料轉換成 Kafka 連接配接器的内部格式,也能将 Kafka 連接配接器内部存儲的資料格式轉換成位元組資料。
使用 Kafka 連接配接器
單機模式
單機模式配置檔案
配置單機模式連接配接器相關參數 config/connect-standalone.properties:
# Kafka 叢集 broker 位址
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
# 指定鍵值對 JSON 轉換器類
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 啟用鍵值對轉換器
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 單機模式設定偏移量存儲檔案
offset.storage.file.filename=/tmp/connect.offsets
# 設定偏移量持久化時間間隔
offset.flush.interval.ms=10000
将資料從檔案導入 Kafka Topic 中
編輯 Kafka 連接配接器 配置檔案 config/connect-file-source.properties:
# 設定連接配接器名字
name=local-file-source
# 指定連接配接器類
connector.class=FileStreamSource
# 設定最大任務數
tasks.max=1
# 指定讀取的檔案
file=/tmp/test.txt
# 指定寫入 Kafka 的 Topic
topic=connect_test
建立資料源檔案并添加資料:
[root@kafka1 ~]# cat /tmp/test.txt
kafka
hadoop
kafka-connect
啟動一個單機模式的連接配接器将資料導入 Kafka Topic 中:
[root@kafka1 kafka]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
啟動消費者程式檢視導入到 connect_test 主題中的資料:
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic connect_test -from-beginning
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"hadoop"}
{"schema":{"type":"string","optional":false},"payload":"kafka-connect"}
{"schema":
當往檔案中追加資料時,消費者可以消費到新的資料:
[root@kafka1 ~]# echo java >> /tmp/test.txt
[root@kafka1 ~]# echo python >> /tmp/test.txt
消費者消費到的新的資料:
{"type":"string","optional":false},"payload":"java"}
{"schema":{"type":"string","optional":false},"payload":"python"}
将 Kafka Topic 中的資料導出到檔案
編輯 Kafka 連接配接器 配置檔案 config/connect-file-sink.properties:
# 設定連接配接器名字
name=local-file-sink
# 指定連接配接器類
connector.class=FileStreamSink
# 設定最大任務數
tasks.max=1
# 将資料寫入的檔案
file=/tmp/sink.txt
# 指定導出資料的 Kafka 的 Topic
topics=connect_test
啟動一個單機模式的連接配接器将 Kafka Topic 中的資料導出:
[root@kafka1 kafka]# connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
檢視導出檔案的内容:
[root@kafka1 ~]# cat /tmp/sink.txt
python
kafka
hadoop
kafka-connect
java
分布式模式
在分布式模式下, Kafka 連接配接器會自動均衡每個事件線程所處理的任務數。允許使用者動态地增加或者減少任務,在執行任務、修改配置、送出偏移量時能夠得到容錯保障。
在分布式模式下,Kafka 連接配接器會在 Kafka Topic 中存儲偏移量,配置和任務狀态(單機模式下是保持在本地檔案中)。建議手動建立存儲偏移量的主題,這樣可以按需設定主題的分區數和副本數。
在分布式模式下, Kafka 連接配接器的配置檔案不能使用指令行,需要使用 REST API 來執行建立,修改和銷毀 Kafka 連機器的操作。
建立連接配接器相關主題
# 建立偏移量的的存儲主題
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 1 --topic connect-offsets
# 建立配置存儲主題
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 6 --topic connect-configs
# 建立任務狀态存儲主題
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 6 --topic connect-status
分布式模式配置檔案
# 設定 Kafka 叢集位址
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
# 設定連接配接器唯一組名稱
group.id=connect-cluster
# 指定鍵值對 JSON 轉換器類
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 啟用鍵值對轉換器
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 設定偏移量的的存儲主題
offset.storage.topic=connect-offsets
# 設定配置存儲主題
config.storage.topic=connect-configs
# 設定任務狀态存儲主題
status.storage.topic=connect-status
# 設定偏移量持久化時間間隔
offset.flush.interval.ms=10000
啟動分布式模式連接配接器:
[root@kafka1 kafka]# connect-distributed.sh config/connect-distributed.properties
檢視連接配接器版本号資訊:
[root@kafka1 ~]# curl http://kafka1:8083
{"version":"2.7.0","commit":"448719dc99a19793","kafka_cluster_id":"wp8iI172SaqLHqNvEh3T-w"}
檢視目前已安裝的插件:
[root@kafka1 ~]# curl http://kafka1:8083/connector-plugins -s | jq
[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.7.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.7.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
連接配接器 API 接口
由于 Kafka 連接配接器旨在作為服務運作,是以還提供了用于管理連接配接器的 REST API。預設情況下,此服務在端口 8083 上運作。以下是目前支援的 API 接口:
GET /connectors #傳回活動連接配接器的清單
POST /connectors #建立一個新的連接配接器; 請求主體應該是包含字元串name字段和config帶有連接配接器配置參數的對象字段的JSON對象
GET /connectors/{name} #擷取有關特定連接配接器的資訊
GET /connectors/{name}/config #擷取特定連接配接器的配置參數
PUT /connectors/{name}/config #更新特定連接配接器的配置參數
GET /connectors/{name}/status #擷取連接配接器的目前狀态,包括連接配接器是否正在運作,失敗,已暫停等,配置設定給哪個工作者,失敗時的錯誤資訊以及所有任務的狀态
GET /connectors/{name}/tasks #擷取目前為連接配接器運作的任務清單
GET /connectors/{name}/tasks/{taskid}/status #擷取任務的目前狀态,包括如果正在運作,失敗,暫停等,配置設定給哪個從業人員,如果失敗,則傳回錯誤資訊
PUT /connectors/{name}/pause #暫停連接配接器及其任務,停止消息處理,直到連接配接器恢複
PUT /connectors/{name}/resume #恢複暫停的連接配接器(或者,如果連接配接器未暫停,則不執行任何操作)
POST /connectors/{name}/restart #重新啟動連接配接器(通常是因為失敗)
POST /connectors/{name}/tasks/{taskId}/restart #重新開機個别任務(通常是因為失敗)
DELETE /connectors/{name} #删除連接配接器,停止所有任務并删除其配置
#Kafka Connect還提供了用于擷取有關連接配接器插件資訊的REST API:
GET /connector-plugins #傳回安裝在Kafka Connect叢集中的連接配接器插件清單。請注意,API僅檢查處理請求的worker的連接配接器,這意味着您可能會看到不一緻的結果,尤其是在滾動更新期間,如果添加新的連接配接器jar
PUT /connector-plugins/{connector-type}/config/validate # 根據配置定義驗證提供的配置值。此API執行每個配置驗證,在驗證期間傳回建議值和錯誤消息。
将資料從檔案導入到 Kafka Topic 中
通過 REST API 請求建立一個新的連接配接器執行個體,将資料導入到 Kafka Topic 中。這裡使用的是 Chrome 浏覽器上名為 API Tester 的插件:
{
"name": "distributed-console-source", #自定義連接配接器名字
"config":
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"topic": "distributed_connect_test", #建立的topic
"file": "/tmp/distributed_test.txt" #讀取的檔案
}
}
檢視剛剛建立的連接配接器:
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
"distributed-console-source"
]
此時開啟一個消費者執行個體可以成功消費到 Kafka Topic 中的資料:
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic distributed_connect_test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"distributed_kafka"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"hadoop"}
通過 REST API 請求建立一個新的連接配接器執行個體,将資料從 Kafka Topic 中導出到檔案中。
{
"name": "distributed-console-sink",
"config":
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "distributed_connect_test", #Kafka 中存在的 Topic
"file": "/tmp/distributed_sink.txt" #導出資料到指定檔案
}
}
檢視目前的連接配接器:
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
"distributed-console-sink",
"distributed-console-source"
]
檢視檔案可以看到資料成功從 Kafka Topic 中導出:
[root@kafka1 ~]# cat /tmp/distributed_sink.txt
distributed_kafka
kafka
hadoop
開發 Kafka 連接配接器插件
開發一個完整的 Kafka 連接配接器插件,分為兩部分來實作:
- 編寫 Source 連接配接器。Source 連接配接器負責将第三方系統的資料導入 Kafka Topic 中。
- 編寫 Sink 連接配接器。Sink 連接配接器負責将 Kafka Topic 中的資料導出到第三方系統中。
第三方系統可以是關系型資料庫(如 MySQL、Oracle 等)、檔案系統(如本地檔案,分布式檔案系統等)、日志系統等。
本執行個體使用的是 Maven 工程,需要在 pom.xml 檔案中引入 Kafka 依賴包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
編寫 Source 連接配接器
編寫一個自定義的 Source 連接配接器,需要實作兩個抽象類:
- SourceConnector 類,用來初始化連接配接器配置和任務數。
- SourceTask 類,用來實作标準輸入或者檔案讀取。
編寫輸入連接配接器執行個體
package book_8;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
/**
* 輸入連接配接器,用來實作讀取配置資訊和配置設定任務等一些初始化工作
* @author 程治玮
* @since 2021/3/16 9:51 下午
*/
public class CustomerFileStreamSourceConnector extends SourceConnector {
// 定義主題配置變量
public static final String TOPIC_CONFIG = "topic";
// 定義檔案配置變量
public static final String FILE_CONFIG = "file";
// 執行個體化一個配置對象
private static final ConfigDef CONFIG_DEF = new ConfigDef().define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.").define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
// 聲明檔案名變量
private String filename;
// 聲明主題變量
private String topic;
/** 擷取版本. */
public String version() {
return AppInfoParser.getVersion();
}
/** 開始初始化. */
public void start(Map<String, String> props) {
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
if (topic == null || topic.isEmpty())
throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source.");
}
/** 執行個體化輸入類. */
public Class<? extends Task> taskClass() {
return CustomerFileStreamSourceTask.class;
}
/** 擷取配置資訊. */
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
@Override
public void stop() {
}
/** 擷取配置對象. */
public ConfigDef config() {
return CONFIG_DEF;
}
}
編寫輸入連接配接器任務類
package book_8;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 輸入連接配接器任務類,用來實作标準輸入或者檔案讀取
* @author 程治玮
* @since 2021/3/16 9:47 下午
*/
public class CustomerFileStreamSourceTask extends SourceTask {
// 聲明一個日志類
private static final Logger LOG = LoggerFactory.getLogger(CustomerFileStreamSourceTask.class);
// 定義檔案字段
public static final String FILENAME_FIELD = "filename";
// 定義偏移量字段
public static final String POSITION_FIELD = "position";
// 定義值的值的資料格式
private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
// 聲明檔案名
private String filename;
// 聲明輸入流對象
private InputStream stream;
// 聲明讀取對象
private BufferedReader reader = null;
// 定義緩沖區大小
private char[] buffer = new char[1024];
// 聲明偏移量變量
private int offset = 0;
// 聲明主題名
private String topic = null;
// 聲明輸入流偏移量
private Long streamOffset;
/** 擷取版本. */
public String version() {
return new CustomerFileStreamSourceConnector().version();
}
/** 開始執行任務. */
public void start(Map<String, String> props) {
filename = props.get(CustomerFileStreamSourceConnector.FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
stream = System.in;
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
}
topic = props.get(CustomerFileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
throw new ConnectException("FileStreamSourceTask config missing topic setting");
}
/** 讀取記錄并傳回資料集. */
public List<SourceRecord> poll() throws InterruptedException {
if (stream == null) {
try {
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Object lastRecordedOffset = offset.get(POSITION_FIELD);
if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
throw new ConnectException("Offset position is the incorrect type");
if (lastRecordedOffset != null) {
LOG.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
long skipLeft = (Long) lastRecordedOffset;
while (skipLeft > 0) {
try {
long skipped = stream.skip(skipLeft);
skipLeft -= skipped;
} catch (IOException e) {
LOG.error("Error while trying to seek to previous offset in file: ", e);
throw new ConnectException(e);
}
}
LOG.debug("Skipped to offset {}", lastRecordedOffset);
}
streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
} else {
streamOffset = 0L;
}
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
LOG.debug("Opened {} for reading", logFilename());
} catch (FileNotFoundException e) {
LOG.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
synchronized (this) {
this.wait(1000);
}
return null;
}
}
try {
final BufferedReader readerCopy;
synchronized (this) {
readerCopy = reader;
}
if (readerCopy == null)
return null;
ArrayList<SourceRecord> records = null;
int nread = 0;
while (readerCopy.ready()) {
nread = readerCopy.read(buffer, offset, buffer.length - offset);
LOG.trace("Read {} bytes from {}", nread, logFilename());
if (nread > 0) {
offset += nread;
if (offset == buffer.length) {
char[] newbuf = new char[buffer.length * 2];
System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
buffer = newbuf;
}
String line;
do {
line = extractLine();
if (line != null) {
LOG.trace("Read a line from {}", logFilename());
if (records == null)
records = new ArrayList<>();
records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, null, null, null, VALUE_SCHEMA, line, System.currentTimeMillis()));
}
} while (line != null);
}
}
if (nread <= 0)
synchronized (this) {
this.wait(1000);
}
return records;
} catch (IOException e) {
}
return null;
}
/** 解析一條記錄. */
private String extractLine() {
int until = -1, newStart = -1;
for (int i = 0; i < offset; i++) {
if (buffer[i] == '\n') {
until = i;
newStart = i + 1;
break;
} else if (buffer[i] == '\r') {
if (i + 1 >= offset)
return null;
until = i;
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
break;
}
}
if (until != -1) {
String result = new String(buffer, 0, until);
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
offset = offset - newStart;
if (streamOffset != null)
streamOffset += newStart;
return result;
} else {
return null;
}
}
/** 停止任務. */
public void stop() {
LOG.trace("Stopping");
synchronized (this) {
try {
if (stream != null && stream != System.in) {
stream.close();
LOG.trace("Closed input stream");
}
} catch (IOException e) {
LOG.error("Failed to close FileStreamSourceTask stream: ", e);
}
this.notify();
}
}
private Map<String, String> offsetKey(String filename) {
return Collections.singletonMap(FILENAME_FIELD, filename);
}
private Map<String, Long> offsetValue(Long pos) {
return Collections.singletonMap(POSITION_FIELD, pos);
}
/** 判斷是标準輸入還是讀取檔案. */
private String logFilename() {
return filename == null ? "stdin" : filename;
}
}
編寫 Sink 連接配接器
在 Kafka 系統中,實作一個自定義的 Sink 連接配接器,需要實作兩個抽象類。
- SinkTask 類:用來實作标準輸出或者檔案寫入。
- SinkConnector 類:用來初始化連接配接器配置和任務數。
編寫輸出連接配接器執行個體
package book_8;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
/**
* 輸出連接配接器,用來實作讀取配置資訊和配置設定任務等一些初始化工作
* @author 程治玮
* @since 2021/3/16 9:56 下午
*/
public class CustomerFileStreamSinkConnector extends SinkConnector {
// 聲明檔案配置變量
public static final String FILE_CONFIG = "file";
// 執行個體化一個配置對象
private static final ConfigDef CONFIG_DEF = new ConfigDef().define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename.");
// 聲明一個檔案名變量
private String filename;
/** 擷取版本資訊. */
public String version() {
return AppInfoParser.getVersion();
}
/** 執行初始化. */
public void start(Map<String, String> props) {
filename = props.get(FILE_CONFIG);
}
/** 執行個體化輸出類.*/
public Class<? extends Task> taskClass() {
return CustomerFileStreamSinkTask.class;
}
/** 擷取配置資訊. */
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
configs.add(config);
}
return configs;
}
public void stop() {
}
/** 擷取配置對象. */
public ConfigDef config() {
return CONFIG_DEF;
}
}
編寫輸出連接配接器任務類
package book_8;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 輸出連接配接器任務類,用來實作标準輸出或者檔案寫入
* @author 程治玮
* @since 2021/3/16 9:54 下午
*/
public class CustomerFileStreamSinkTask extends SinkTask {
// 聲明一個日志對象
private static final Logger LOG = LoggerFactory.getLogger(CustomerFileStreamSinkTask.class);
// 聲明一個檔案名變量
private String filename;
// 聲明一個輸出流對象
private PrintStream outputStream;
/** 構造函數. */
public CustomerFileStreamSinkTask() {
}
/** 重載構造函數. */
public CustomerFileStreamSinkTask(PrintStream outputStream) {
filename = null;
this.outputStream = outputStream;
}
/** 擷取版本号. */
public String version() {
return new CustomerFileStreamSinkConnector().version();
}
/** 開始執行任務. */
public void start(Map<String, String> props) {
filename = props.get(CustomerFileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
outputStream = System.out;
} else {
try {
outputStream = new PrintStream(new FileOutputStream(filename, true), false, StandardCharsets.UTF_8.name());
} catch (FileNotFoundException | UnsupportedEncodingException e) {
throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
}
}
}
/** 發送記錄給Sink并輸出. */
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
LOG.trace("Writing line to {}: {}", logFilename(), record.value());
outputStream.println(record.value());
}
}
/** 持久化資料. */
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
LOG.trace("Flushing output stream for {}", logFilename());
outputStream.flush();
}
/** 停止任務. */
public void stop() {
if (outputStream != null && outputStream != System.out)
outputStream.close();
}
/** 判斷是标準輸出還是檔案寫入. */
private String logFilename() {
return filename == null ? "stdout" : filename;
}
}
打包與部署
将編寫好的連接配接器代碼打成 JAR 包,放在每台 Kafka 的 libs目錄下,然後重新開機 Kafka 叢集 和 分布式模式連接配接器。
啟動完成後,可以通過下面指令檢視已安裝的連接配接器插件,可以看到兩個自定義開發的連接配接器插件已經部署成功:
[root@kafka1 ~]# curl http://kafka1:8083/connector-plugins -s | jq
[
# 自定義的 Sink 連接配接器插件
{
"class": "book_8.CustomerFileStreamSinkConnector",
"type": "sink",
"version": "2.7.0"
},
# 自定義的 Source 連接配接器插件
{
"class": "book_8.CustomerFileStreamSourceConnector",
"type": "source",
"version": "2.7.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.7.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.7.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
Kafka 連接配接器使用與開發
{
"name": "customer-distributed-console-source",
"config":
{
"connector.class": "book_8.CustomerFileStreamSourceConnector",
"tasks.max": "1",
"topic": "customer_distributed_connect_test",
"file": "/tmp/customer_distributed_test.txt"
}
}
檢視現在已經建立的連接配接器:
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
"customer-distributed-console-source",
"distributed-console-sink",
"distributed-console-source"
]
往檔案中添加兩條資料:
echo kubernetes >> /tmp/customer_distributed_test.txt
echo netty >> /tmp/customer_distributed_test.txt
通過消費者可以消費到剛剛添加的兩條資料:
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic customer_distributed_connect_test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"kubernetes"}
{"schema":{"type":"string","optional":false},"payload":"netty"}
Kafka 連接配接器使用與開發 請求 URL:http://kafka1:8083/connectors
請求 Body:
{
"name": "customer-distributed-console-sink",
"config":
{
"connector.class": "book_8.CustomerFileStreamSinkConnector",
"tasks.max": "1",
"topics": "customer_distributed_connect_test",
"file": "/tmp/customer_distributed_sink.txt"
}
}
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
"customer-distributed-console-source",
"distributed-console-sink",
"distributed-console-source",
"customer-distributed-console-sink"
]
檢視檔案,可以看到成功從 Kafka Topic 中将資料導出到檔案:
[root@kafka1 ~]# cat /tmp/customer_distributed_sink.txt
kubernetes
netty