天天看點

Kafka Connector 使用說明

Broker:叢集中的一個執行個體。

Workers:運作Connector和Task的程序。

Task:資料寫入Kafka和從Kafka中讀出的具體實作。

Connector:通過管理Task來協調資料流的進階抽象。

Source:負責導入資料到Kafka。

Sink:負責從Kafka導出資料。

支援分布式部署。

Converter:Connector和其他存儲系統直接發送或者接受資料之間轉換資料。

Transform:針對值(鍵值對中的Value)的輕量級資料調整的工具。

調用接口建立索引:

http://localhost:7002/search/initialization/

Kafka的主題,Oracle的表名是區分大小寫的,推薦使用大寫

下載下傳confluent-5.2.1-2.12.tar,上傳伺服器(/home/目錄下)後解壓,進入其目錄

cd /home/confluent-5.2.1

Zookeeper(2181)

如果環境中已經有Zookeeper,直接連接配接環境中的Zookeeper,無需啟動自帶的Zookeeper;如果沒有,才使用自帶的Zookeeper:

./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

注意:先使用前台運作方式啟動程式,待程式正常啟動、不報錯後可以再改用守護程序(-daemon)的方式啟動

啟動Broker(9092)

修改配置檔案

打開配置檔案

vi ./etc/kafka/server.properties

确認Zookeeper

zookeeper.connect=10.110.5.83:2181

補充配置項

host.name=10.110.5.81

listeners=PLAINTEXT://10.110.5.81:9092

advertised.listeners=PLAINTEXT://10.110.5.81:9092

退出儲存

啟動

./bin/kafka-server-start -daemon etc/kafka/server.properties

啟動Schema Registry(8081)

./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

建立Kafka測試主題“trace”(這一步是多餘的,因為啟動Producer、Consumer後,Kafka會自動建立主題)

./bin/kafka-topics \

--topic trace \

--create \

--bootstrap-server 10.110.5.81:9092 \

--replication-factor 1 \

--partitions 1

單例模式啟動Connector

修改兩個配置檔案

connect-standalone.properties

打開該檔案

vi etc/kafka/connect-standalone.properties

修改以下兩項配置為false

key.converter.schemas.enable=false

value.converter.schemas.enable=false

修改位址(預設為本機)

bootstrap.servers=10.110.5.81:9092

儲存後關閉

elasticsearch-sink.properties

複制連接配接檔案

cp etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties etc/kafka-connect-jdbc/elasticsearch-sink.properties

vi etc/kafka-connect-elasticsearch/elasticsearch-sink.properties

修改配置

topics=trace

connection.url=http://10.110.5.84:9200

#與hbase保持統一,故采用“info”:

type.name=info

#新增模式忽略,防止抛出異常(可能多餘)

schema.ignore=true

以單例模式啟動Connector

./bin/connect-standalone -daemon etc/kafka/connect-standalone.properties etc/kafka-connect-elasticsearch/elasticsearch-sink.properties \

--property print.key=true \

--property schema.registry.url=http://10.110.5.81:8081

新開XShell标簽,啟動Consumer,以監聽消息是否發送(心理安慰,作用不大)

./bin/kafka-console-consumer \

--from-beginning \

--property print.key=true

新開XShell标簽,啟動Producer,生産資料

指定Schema

./bin/kafka-avro-console-producer \

--broker-list 10.110.5.81:9092 \

--property parse.key=true \

--property key.separator=: \

--property schema.registry.url=http://10.110.5.81:8081 \

--property key.schema='{"name":"key","type":"string"}' \

--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"xw","type":"string"},{"name":"ddbh","type":"string"},{"name":"fssj","type":"string"},{"name":"location","type":"string"},{"name":"jd","type":"string"},{"name":"wd","type":"string"},{"name":"zdh","type":"string"},{"name":"sfd","type":"string"},{"name":"mdd","type":"string"},{"name":"zj","type":"string"}]}'

以JSON格式輸入測試資料

"R210124198701172625_20201208144517":{"id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201208144517","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}

驗證結果

在打開了Consumer的視窗中我們可以看到剛才生産的資料

向ElasticSearch查詢,在浏覽器中輸入以下連結,可以看到資料已經插入了

http://10.110.5.81:9200/trace/_search?pretty

可使用<code>jps</code>檢視相關背景程序

可使用以下指令來監聽端口,檢視相關程式是否運作

netstat -natpl

netstat -tupln|grep 9092

删除ES的索引

curl -XDELETE http://localhost:9200/trace

Confluent Control Center(9021)CLI

Kafka Connector REST API(8083)

Kafka Rest Proxy(8082)

./bin/kafka-rest-start -daemon etc/kafka-rest/kafka-rest.properties

檢視所有主題

--list \

--zookeeper 10.110.5.83:2181

删除某一主題,見:

标記要删除的主題

--delete \

修改配置檔案,使删除功能可用

vi etc/kafka/server.properties

delete.topic.enable=true

在上面打開的配置檔案中找到資料目錄“log.dirs=/tmp/kafka-logs”,然後進入

cd /tmp/kafka-logs

rm -rf trace

停止Kafka

推薦

./bin/kafka-server-stop

強制

jps

kill -9 *****

重新開機Kafka

./bin/kafka-server-start etc/kafka/server.properties

确認是否已删除

可能需要重新開機Zookeeper

之後删除不需要重新開機和手動删除檔案,隻需調用步驟8.1。參考自:

kafka如何徹底删除topic及資料 kafka全部資料清空與某一topic資料清空

異常

生産者異常:Schema being registered is incompatible with an earlier schema

解決方法:

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \

--data '{"compatibility": "NONE"}' http://10.110.5.81:8081/config

檢視某一主題的情況

--describe \

--bootstrap-server 10.110.5.81:9092

資料庫表

1. 資料庫表在建立時,字段名不能用雙引号引起來,否則會報錯:ORA-00904: invalid identifier

準備工作

本章節在《Kafka Connect Sink Elasticsearch》基礎之上進行

進入指定依賴包目錄

cd /home/confluent-5.2.1/share/java/kafka-connect-jdbc

上傳ojdbc6.jar到步驟3中的目錄

傳回confluent目錄

啟動Connector

cp etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties etc/kafka-connect-jdbc/oracle-sink.properties

打開配置檔案oracle-sink.properties

vi etc/kafka-connect-jdbc/oracle-sink.properties

name=oracle-sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

connection.url=jdbc:oracle:thin:@位址:1521:IDR

connection.user=使用者名

connection.password=密碼

auto.create=true

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/oracle-sink.properties

因為在oracle-sink.properties中設定了auto.create=true,是以Connector會在Oracle中自動建立與主題同名的表(是以主題才大寫的)

啟動Producer

輸入資料

{"id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201208144517","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}

驗證結果。可以在相應的資料庫中查詢到資料,即證明成功了

SELECT * FROM trace;

cd /home/confluent-5.2.1/

HBase的相關支援未包含在Confluent的jar中,需要單獨下載下傳解壓更名檔案夾

下載下傳并上傳到伺服器後解壓縮confluentinc-kafka-connect-hbase-1.0.5.zip

改名

cd confluentinc-kafka-connect-hbase-1.0.5

将etc改名為etc/kafka-connect-hbase/

将lib改名為share/java/kafka-connect-hbase/

讓後将這兩個檔案夾合并到confluent中去

HBase準備

補充DNS

打開hosts

vim /etc/hosts

粘貼大資料環境的DNS

建立表(TM_TRACE)及列族(info)

補充配置檔案

上傳hbase-site.xml到/home/confluent-5.2.1/

打包配置檔案

jar -uvf share/java/kafka-connect-hbase/kafka-connect-hbase-1.0.5.jar hbase-site.xml

cp etc/kafka-connect-hbase/hbase-sink-quickstart.properties etc/kafka-connect-hbase/hbase-sink.properties

vi etc/kafka-connect-hbase/hbase-sink.properties

name=hbase-sink-connector

connector.class=io.confluent.connect.hbase.HBaseSinkConnector

hbase.zookeeper.quorum=master2.bds.inspur,manager.bds.inspur,master1.bds.inspur

hbase.zookeeper.property.clientPort=2181

gcp.bigtable.instance.id=HBASE-INSTANCE

auto.create.tables=true

auto.create.column.families=true

# auto.offset.rest=latest

# 表名;列族名在資料中指明

table.name.format=TM_TRACE

error.mode=ignore

topics=tm_trace

confluent.topic=tm_trace

confluent.topic.replication.factor=1

confluent.topic.bootstrap.servers=10.110.5.81:9092

insert.mode=UPSERT

max.batch.size=1000

# row.key.definition=對象類型,證件号碼,分隔符,發生時間,三位防重碼

row.key.definition=type,id,delimiter,fssj,unrepeat

#去除多餘的值(對于elasticsearch來說不多餘)

transforms=DropLocation

transforms.DropLocation.type=org.apache.kafka.connect.transforms.MaskField$Value

transforms.DropLocation.fields=location

儲存後退出

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hbase/hbase-sink.properties

啟動指令行Consumer輔助檢視

./bin/kafka-avro-console-consumer \

--topic tm_trace \

啟動./bin/kafka-avro-console-producer \--topic tm_trace \--broker-list 10.110.5.81:9092 \--property parse.key=true \--property key.separator=@ \--property schema.registry.url=http://10.110.5.81:8081 \--property key.schema='{"namespace": "com.inspur.trace.entity.schema","name": "TraceKey","type":"string"}' \--property value.schema='{"namespace": "com.inspur.trace.entity.schema","name": "TraceHBaseValue","type":"record","fields":[{"name":"info","type":{"name":"columns","type":"record","fields":[{"name":"type","type":"string"},{"name":"id","type":"string"},{"name":"xw","type":"string"},{"name":"ddbh","type":"string"},{"name":"fssj","type":"string"},{"name":"location","type":"string"},{"name":"jd","type":"string"},{"name":"wd","type":"string"},{"name":"zdh","type":"string"},{"name":"sfd","type":"string"},{"name":"mdd","type":"string"},{"name":"zj","type":"string"}]}}]}'

注意:

hbase-connector使用的是結構化資料(org.apache.kafka.connect.data.Struct),是以需要使用能發送結構化資料的Producer,即avro-producer。

并且列族“info”需要在Producer中配置(符合HBase的理念),是以需要我們用avro配置模闆(Schema)用于解析結構化資料。

而且,如果想使用拼接列族功能,也需要為行鍵配置模闆并發送結構化的行鍵。

是以才有了如上的指令。

關于Producer啟動指令中參數的說明

# 開啟鍵的格式化

parse.key=true

# 将“@”作為鍵與值的分隔符

key.separator=@

# 模闆注冊中心位址

schema.registry.url=……

# 鍵的解析模闆

key.schema=……

# 值的解析模闆

value.schema=……

關于行鍵

通過閱讀RowKeyExtractor的源代碼可知,HBaseSinkConnector有兩種方式的接收行鍵:

一種是接收到的行鍵是結構化資料,HBaseSinkConnector将根據模闆将結構化資料拼接為rowkey(需要在hbase-sink.properties指明字段名,同時還需要在Producer中啟用并傳遞結構化鍵);

另一種是未指明行鍵字段名,然後将鍵(key)作為行鍵(rowkey)。

兩種方式之外的搭配,不做處理,抛出異常。

采用結構化拼接鍵雖然進階,但是徒增複雜度,目前我想不出采用這種方式的的必要性,這裡隻是做技術驗證,是以采用的是結構化拼接鍵。

真正應用的時候還是推薦直接使用鍵作為行鍵, 即,将“row.key.definition”注釋掉。

輸入測試資料

"R210124198701172625_20201210104500est"@{"info":{"type":"R","id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201210104502","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}}

驗證結果:直接檢視HBase中的TM_TRACE是否有剛插入的值

簡介

Elasticsearch、HBase的表結構是不相容的:

因為在不修改原jar中代碼的前提下,HBase的資料有兩層,外層為列族,比如列族為“info”的資料“{"info":{"xw":"LD","type":"R",……}}”,但是Elasticsearch的資料就一層,如果es-sink按照hbase-sink的兩層結構來抽取資料到索引中,必然報錯。

軌迹應用依賴Elasticsearch的基于地理位置的搜尋功能,這就要求我們新增一個geo_point類型的字段用來進行緯經度的搜尋(暫且稱呼該字段為“location”),但是HBase并不需要這一字段,會多餘。

而KSQL可以解決這倆問題。

KSQL

KSQL就是一個用來操作Kafka中的資料流的工具。

具體地來說,KSQL可以通過編寫SQL來操作Kafka中的資料流,它可以解析結構化的資料,以及直接輸出結構化資料的指定字段,還可以新增字段。

服務端

vim etc/ksqldb/ksql-server.properties

編輯配置檔案

listeners=http://10.110.5.81:8088

ksql.logging.processing.topic.auto.create=true

ksql.logging.processing.stream.auto.create=true

ksql.schema.registry.url=http://10.110.5.81:8081

啟動服務端

./bin/ksql-server-start-6.0.1 etc/ksqldb/ksql-server.properties

用戶端

啟動完服務端後,我們還需要啟動用戶端,增加一系列配置。

./bin/ksql http://10.110.5.81:8088

建立新資料流,以便從“tm_trace”中引入原始資料

create stream trace_hbase (info struct) with (kafka_topic='tm_trace', value_format='avro');

注意:大小寫不敏感,除非用英文雙引号包含,與Oracle相仿

解析“tm_trace”,去除外層包裹,同時新增地理位置字段“location”,然後将該資料流寫入到負責向elasticsearch中寫入資料的主題中去。

create stream t1 with (kafka_topic='trace') as select info-&gt;type "type", info-&gt;id "id", info-&gt;xw "xw", info-&gt;ddbh "ddbh", info-&gt;fssj "fssj", info-&gt;jd "jd", info-&gt;wd "wd", struct("lat" := info-&gt;wd, "lon" := info-&gt;jd) AS "location", info-&gt;zdh "zdh", info-&gt;sfd "sfd", info-&gt;mdd "mdd", info-&gt;zj "zj" from trace_hbase;

注意:主題已更換,需要到./etc/kafka-connect-hbase/hbase-sink.properties中修改主題

修改SQL很遺憾,KSQL不支援修改已經建立的資料流,我們需要删除原來的,然後重新建立:

确認查詢是否正在運作

show queries;

如果有,需要将其停掉

terminate QueryId;(QueryId大小寫敏感)

檢視流的名稱

show streams;

删除舊的流

drop stream trace_hbase;

drop stream trace_elasticsearch;

其他有用SQL參照

set 'auto.offset.reset' = 'earliest';

select "jd", "wd" from t1 EMIT CHANGES;

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/elasticsearch-sink.properties etc/kafka-connect-hbase/hbase-sink.properties

分别對叢集中的每台機器進行如下操作

部分ip需要填寫本機ip,比如①處

“localhost”、“0.0.0.0”,這兩個位址不會被正确識别,需要填寫實際ip,如10.110.5.81

Kafka Broker

打開檔案

修改(或添加)幾個配置

broker.id=2

host.name=10.110.5.81①

zookeeper.connect=10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181

儲存并退出

啟動Broker

之後

ctrl + c停止該Broker

以背景程序方式啟動Broker

确認Broker是否啟動

輸入<code>jps</code>,如果有名為SupportedKafka(Broker)的程序,則表示啟動成功

Schema Register

vi etc/schema-registry/schema-registry.properties

listeners=http://10.110.5.81:8081

kafkastore.connection.url=10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

與Kafka Broker中的步驟五相似

Kafka Rest API

vi etc/kafka-rest/kafka-rest.properties

schema.registry.url=http://10.110.5.81:8081

./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties

與Kafka Broker中的步驟五不同的是,要以以下方式才能背景運作

nohup ./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties &gt; /dev/null 2&gt;&amp;1 &amp;

建立主題(3分區,2備份)

--zookeeper 10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181 \

--partitions 3 \

--replication-factor 2

Kafka connect

vi etc/schema-registry/connect-avro-distributed.properties

bootstrap.servers=10.110.5.81:9092,10.110.5.82:9092,10.110.5.83:9092

group.id=acp_cluster

./bin/connect-distributed etc/kafka/connect-distributed.properties

建立Connector

使用Kafka Rest API(8083)啟動Connector,參數内容參照etc/kafka-connect-***/***.properties配置檔案,如:

curl 'http://10.110.5.81:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '

{

"name":"elasticsearch-sink",

"config":{

"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max":10,

"topics":"ACP_KAFKA",

"key.ignore":true,

"schema.ignore":true,

"connection.url":"http://10.110.5.81:9200",

"type.name":"kafka-connect",

"schema.registry.url":"http://10.110.5.81:8081",

"key.converter":"io.confluent.connect.avro.AvroConverter",

"key.converter.schema.registry.url":"http://10.110.5.81:8081",

"value.converter":"io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url":"http://10.110.5.81:8081",

"topic.index.map":"ACP_KAFKA:acp_kafka_message"

}

'

"name":"oracle-sink",

"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max":"1",

"connection.url":"jdbc:oracle:thin:@10.110.1.132:1521:IDR",

"connection.user":"ppw",

"connection.password":"ppw",

"auto.create":"true",

"table.name.format":"ACP_KAFKA_MESSAGE",

"transforms":"Rename",

"transforms.Rename.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",

"transforms.Rename.renames":"id:ID,type:TYPE,msg:MESSAGE,time:TIME"

curl 10.110.5.81:8083/connectors/oracle-sink/config -X PUT -i -H "Content-Type:application/json" -d '{}'

這種方式不可行

./bin/confluent load elasticsearch-sink -d etc/kafka-connect-elasticsearch/elasticsearch-sink.properties

對Kafka Rest API的補充說明方法路徑說明GET/connectors傳回活動連接配接器的清單POST/connectors建立一個新的連接配接器; 請求主體應該是包含字元串name字段和config帶有連接配接器配置參數的對象字段的JSON對象GET/connectors/{name}擷取有關特定連接配接器的資訊GET/connectors/{name}/config擷取特定連接配接器的配置參數PUT/connectors/{name}/config更新特定連接配接器的配置參數GET/connectors/{name}/status擷取連接配接器的目前狀态,包括連接配接器是否正在運作,失敗,已暫停等,配置設定給哪個工作者,失敗時的錯誤資訊以及所有任務的狀态GET/connectors/{name}/tasks擷取目前為連接配接器運作的任務清單GET/connectors/{name}/tasks/{taskid}/status擷取任務的目前狀态,包括如果正在運作,失敗,暫停等,配置設定給哪個從業人員,如果失敗,則傳回錯誤資訊PUT/connectors/{name}/pause暫停連接配接器及其任務,停止消息處理,直到連接配接器恢複PUT/connectors/{name}/resume恢複暫停的連接配接器(或者,如果連接配接器未暫停,則不執行任何操作)POST/connectors/{name}/restart重新啟動連接配接器(通常是因為失敗)POST/connectors/{name}/tasks/{taskId}/restart重新開機個别任務(通常是因為失敗)DELETE/connectors/{name}删除連接配接器,停止所有任務并删除其配置

# Register a new version of a schema under the subject "trace-key"

# Register a new version of a schema under the subject "trace-value"

# List all subjects

# List all schema versions registered under the subject "trace-value"

# Fetch a schema by globally unique id 1

# Fetch version 1 of the schema registered under subject "trace-value"

# Fetch the most recently registered schema under subject "trace-value"

# Delete version 3 of the schema registered under subject "trace-value"

# Delete all versions of the schema registered under subject "trace-value"

# Check whether a schema has been registered under subject "trace-key"

# Test compatibility of a schema with the latest schema under subject "trace-value"

# Get top level config

# Update compatibility requirements globally

# Update compatibility requirements under the subject "trace-value"