天天看点

Kafka Connector 使用说明

Broker:集群中的一个实例。

Workers:运行Connector和Task的进程。

Task:数据写入Kafka和从Kafka中读出的具体实现。

Connector:通过管理Task来协调数据流的高级抽象。

Source:负责导入数据到Kafka。

Sink:负责从Kafka导出数据。

支持分布式部署。

Converter:Connector和其他存储系统直接发送或者接受数据之间转换数据。

Transform:针对值(键值对中的Value)的轻量级数据调整的工具。

调用接口创建索引:

http://localhost:7002/search/initialization/

Kafka的主题,Oracle的表名是区分大小写的,推荐使用大写

下载confluent-5.2.1-2.12.tar,上传服务器(/home/目录下)后解压,进入其目录

cd /home/confluent-5.2.1

Zookeeper(2181)

如果环境中已经有Zookeeper,直接连接环境中的Zookeeper,无需启动自带的Zookeeper;如果没有,才使用自带的Zookeeper:

./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

注意:先使用前台运行方式启动程序,待程序正常启动、不报错后可以再改用守护进程(-daemon)的方式启动

启动Broker(9092)

修改配置文件

打开配置文件

vi ./etc/kafka/server.properties

确认Zookeeper

zookeeper.connect=10.110.5.83:2181

补充配置项

host.name=10.110.5.81

listeners=PLAINTEXT://10.110.5.81:9092

advertised.listeners=PLAINTEXT://10.110.5.81:9092

退出保存

启动

./bin/kafka-server-start -daemon etc/kafka/server.properties

启动Schema Registry(8081)

./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

创建Kafka测试主题“trace”(这一步是多余的,因为启动Producer、Consumer后,Kafka会自动创建主题)

./bin/kafka-topics \

--topic trace \

--create \

--bootstrap-server 10.110.5.81:9092 \

--replication-factor 1 \

--partitions 1

单例模式启动Connector

修改两个配置文件

connect-standalone.properties

打开该文件

vi etc/kafka/connect-standalone.properties

修改以下两项配置为false

key.converter.schemas.enable=false

value.converter.schemas.enable=false

修改地址(默认为本机)

bootstrap.servers=10.110.5.81:9092

保存后关闭

elasticsearch-sink.properties

复制连接文件

cp etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties etc/kafka-connect-jdbc/elasticsearch-sink.properties

vi etc/kafka-connect-elasticsearch/elasticsearch-sink.properties

修改配置

topics=trace

connection.url=http://10.110.5.84:9200

#与hbase保持统一,故采用“info”:

type.name=info

#新增模式忽略,防止抛出异常(可能多余)

schema.ignore=true

以单例模式启动Connector

./bin/connect-standalone -daemon etc/kafka/connect-standalone.properties etc/kafka-connect-elasticsearch/elasticsearch-sink.properties \

--property print.key=true \

--property schema.registry.url=http://10.110.5.81:8081

新开XShell标签,启动Consumer,以监听消息是否发送(心理安慰,作用不大)

./bin/kafka-console-consumer \

--from-beginning \

--property print.key=true

新开XShell标签,启动Producer,生产数据

指定Schema

./bin/kafka-avro-console-producer \

--broker-list 10.110.5.81:9092 \

--property parse.key=true \

--property key.separator=: \

--property schema.registry.url=http://10.110.5.81:8081 \

--property key.schema='{"name":"key","type":"string"}' \

--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"xw","type":"string"},{"name":"ddbh","type":"string"},{"name":"fssj","type":"string"},{"name":"location","type":"string"},{"name":"jd","type":"string"},{"name":"wd","type":"string"},{"name":"zdh","type":"string"},{"name":"sfd","type":"string"},{"name":"mdd","type":"string"},{"name":"zj","type":"string"}]}'

以JSON格式输入测试数据

"R210124198701172625_20201208144517":{"id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201208144517","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}

验证结果

在打开了Consumer的窗口中我们可以看到刚才生产的数据

向ElasticSearch查询,在浏览器中输入以下链接,可以看到数据已经插入了

http://10.110.5.81:9200/trace/_search?pretty

可使用<code>jps</code>查看相关后台进程

可使用以下命令来监听端口,查看相关程序是否运行

netstat -natpl

netstat -tupln|grep 9092

删除ES的索引

curl -XDELETE http://localhost:9200/trace

Confluent Control Center(9021)CLI

Kafka Connector REST API(8083)

Kafka Rest Proxy(8082)

./bin/kafka-rest-start -daemon etc/kafka-rest/kafka-rest.properties

查看所有主题

--list \

--zookeeper 10.110.5.83:2181

删除某一主题,见:

标记要删除的主题

--delete \

修改配置文件,使删除功能可用

vi etc/kafka/server.properties

delete.topic.enable=true

在上面打开的配置文件中找到数据目录“log.dirs=/tmp/kafka-logs”,然后进入

cd /tmp/kafka-logs

rm -rf trace

停止Kafka

推荐

./bin/kafka-server-stop

强制

jps

kill -9 *****

重启Kafka

./bin/kafka-server-start etc/kafka/server.properties

确认是否已删除

可能需要重启Zookeeper

之后删除不需要重启和手动删除文件,只需调用步骤8.1。参考自:

kafka如何彻底删除topic及数据 kafka全部数据清空与某一topic数据清空

异常

生产者异常:Schema being registered is incompatible with an earlier schema

解决方法:

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \

--data '{"compatibility": "NONE"}' http://10.110.5.81:8081/config

查看某一主题的情况

--describe \

--bootstrap-server 10.110.5.81:9092

数据库表

1. 数据库表在新建时,字段名不能用双引号引起来,否则会报错:ORA-00904: invalid identifier

准备工作

本章节在《Kafka Connect Sink Elasticsearch》基础之上进行

进入指定依赖包目录

cd /home/confluent-5.2.1/share/java/kafka-connect-jdbc

上传ojdbc6.jar到步骤3中的目录

返回confluent目录

启动Connector

cp etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties etc/kafka-connect-jdbc/oracle-sink.properties

打开配置文件oracle-sink.properties

vi etc/kafka-connect-jdbc/oracle-sink.properties

name=oracle-sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

connection.url=jdbc:oracle:thin:@地址:1521:IDR

connection.user=用户名

connection.password=密码

auto.create=true

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/oracle-sink.properties

因为在oracle-sink.properties中设置了auto.create=true,所以Connector会在Oracle中自动创建与主题同名的表(所以主题才大写的)

启动Producer

输入数据

{"id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201208144517","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}

验证结果。可以在相应的数据库中查询到数据,即证明成功了

SELECT * FROM trace;

cd /home/confluent-5.2.1/

HBase的相关支持未包含在Confluent的jar中,需要单独下载解压更名文件夹

下载并上传到服务器后解压缩confluentinc-kafka-connect-hbase-1.0.5.zip

改名

cd confluentinc-kafka-connect-hbase-1.0.5

将etc改名为etc/kafka-connect-hbase/

将lib改名为share/java/kafka-connect-hbase/

让后将这两个文件夹合并到confluent中去

HBase准备

补充DNS

打开hosts

vim /etc/hosts

粘贴大数据环境的DNS

创建表(TM_TRACE)及列族(info)

补充配置文件

上传hbase-site.xml到/home/confluent-5.2.1/

打包配置文件

jar -uvf share/java/kafka-connect-hbase/kafka-connect-hbase-1.0.5.jar hbase-site.xml

cp etc/kafka-connect-hbase/hbase-sink-quickstart.properties etc/kafka-connect-hbase/hbase-sink.properties

vi etc/kafka-connect-hbase/hbase-sink.properties

name=hbase-sink-connector

connector.class=io.confluent.connect.hbase.HBaseSinkConnector

hbase.zookeeper.quorum=master2.bds.inspur,manager.bds.inspur,master1.bds.inspur

hbase.zookeeper.property.clientPort=2181

gcp.bigtable.instance.id=HBASE-INSTANCE

auto.create.tables=true

auto.create.column.families=true

# auto.offset.rest=latest

# 表名;列族名在数据中指明

table.name.format=TM_TRACE

error.mode=ignore

topics=tm_trace

confluent.topic=tm_trace

confluent.topic.replication.factor=1

confluent.topic.bootstrap.servers=10.110.5.81:9092

insert.mode=UPSERT

max.batch.size=1000

# row.key.definition=对象类型,证件号码,分隔符,发生时间,三位防重码

row.key.definition=type,id,delimiter,fssj,unrepeat

#去除多余的值(对于elasticsearch来说不多余)

transforms=DropLocation

transforms.DropLocation.type=org.apache.kafka.connect.transforms.MaskField$Value

transforms.DropLocation.fields=location

保存后退出

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hbase/hbase-sink.properties

启动命令行Consumer辅助查看

./bin/kafka-avro-console-consumer \

--topic tm_trace \

启动./bin/kafka-avro-console-producer \--topic tm_trace \--broker-list 10.110.5.81:9092 \--property parse.key=true \--property key.separator=@ \--property schema.registry.url=http://10.110.5.81:8081 \--property key.schema='{"namespace": "com.inspur.trace.entity.schema","name": "TraceKey","type":"string"}' \--property value.schema='{"namespace": "com.inspur.trace.entity.schema","name": "TraceHBaseValue","type":"record","fields":[{"name":"info","type":{"name":"columns","type":"record","fields":[{"name":"type","type":"string"},{"name":"id","type":"string"},{"name":"xw","type":"string"},{"name":"ddbh","type":"string"},{"name":"fssj","type":"string"},{"name":"location","type":"string"},{"name":"jd","type":"string"},{"name":"wd","type":"string"},{"name":"zdh","type":"string"},{"name":"sfd","type":"string"},{"name":"mdd","type":"string"},{"name":"zj","type":"string"}]}}]}'

注意:

hbase-connector使用的是结构化数据(org.apache.kafka.connect.data.Struct),所以需要使用能发送结构化数据的Producer,即avro-producer。

并且列族“info”需要在Producer中配置(符合HBase的理念),所以需要我们用avro配置模板(Schema)用于解析结构化数据。

而且,如果想使用拼接列族功能,也需要为行键配置模板并发送结构化的行键。

所以才有了如上的命令。

关于Producer启动命令中参数的说明

# 开启键的格式化

parse.key=true

# 将“@”作为键与值的分隔符

key.separator=@

# 模板注册中心地址

schema.registry.url=……

# 键的解析模板

key.schema=……

# 值的解析模板

value.schema=……

关于行键

通过阅读RowKeyExtractor的源代码可知,HBaseSinkConnector有两种方式的接收行键:

一种是接收到的行键是结构化数据,HBaseSinkConnector将根据模板将结构化数据拼接为rowkey(需要在hbase-sink.properties指明字段名,同时还需要在Producer中启用并传递结构化键);

另一种是未指明行键字段名,然后将键(key)作为行键(rowkey)。

两种方式之外的搭配,不做处理,抛出异常。

采用结构化拼接键虽然高级,但是徒增复杂度,目前我想不出采用这种方式的的必要性,这里只是做技术验证,所以采用的是结构化拼接键。

真正应用的时候还是推荐直接使用键作为行键, 即,将“row.key.definition”注释掉。

输入测试数据

"R210124198701172625_20201210104500est"@{"info":{"type":"R","id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201210104502","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}}

验证结果:直接查看HBase中的TM_TRACE是否有刚插入的值

简介

Elasticsearch、HBase的表结构是不兼容的:

因为在不修改原jar中代码的前提下,HBase的数据有两层,外层为列族,比如列族为“info”的数据“{"info":{"xw":"LD","type":"R",……}}”,但是Elasticsearch的数据就一层,如果es-sink按照hbase-sink的两层结构来抽取数据到索引中,必然报错。

轨迹应用依赖Elasticsearch的基于地理位置的搜索功能,这就要求我们新增一个geo_point类型的字段用来进行纬经度的搜索(暂且称呼该字段为“location”),但是HBase并不需要这一字段,会多余。

而KSQL可以解决这俩问题。

KSQL

KSQL就是一个用来操作Kafka中的数据流的工具。

具体地来说,KSQL可以通过编写SQL来操作Kafka中的数据流,它可以解析结构化的数据,以及直接输出结构化数据的指定字段,还可以新增字段。

服务端

vim etc/ksqldb/ksql-server.properties

编辑配置文件

listeners=http://10.110.5.81:8088

ksql.logging.processing.topic.auto.create=true

ksql.logging.processing.stream.auto.create=true

ksql.schema.registry.url=http://10.110.5.81:8081

启动服务端

./bin/ksql-server-start-6.0.1 etc/ksqldb/ksql-server.properties

客户端

启动完服务端后,我们还需要启动客户端,增加一系列配置。

./bin/ksql http://10.110.5.81:8088

创建新数据流,以便从“tm_trace”中引入原始数据

create stream trace_hbase (info struct) with (kafka_topic='tm_trace', value_format='avro');

注意:大小写不敏感,除非用英文双引号包含,与Oracle相仿

解析“tm_trace”,去除外层包裹,同时新增地理位置字段“location”,然后将该数据流写入到负责向elasticsearch中写入数据的主题中去。

create stream t1 with (kafka_topic='trace') as select info-&gt;type "type", info-&gt;id "id", info-&gt;xw "xw", info-&gt;ddbh "ddbh", info-&gt;fssj "fssj", info-&gt;jd "jd", info-&gt;wd "wd", struct("lat" := info-&gt;wd, "lon" := info-&gt;jd) AS "location", info-&gt;zdh "zdh", info-&gt;sfd "sfd", info-&gt;mdd "mdd", info-&gt;zj "zj" from trace_hbase;

注意:主题已更换,需要到./etc/kafka-connect-hbase/hbase-sink.properties中修改主题

修改SQL很遗憾,KSQL不支持修改已经创建的数据流,我们需要删除原来的,然后重新创建:

确认查询是否正在运行

show queries;

如果有,需要将其停掉

terminate QueryId;(QueryId大小写敏感)

查看流的名称

show streams;

删除旧的流

drop stream trace_hbase;

drop stream trace_elasticsearch;

其他有用SQL参照

set 'auto.offset.reset' = 'earliest';

select "jd", "wd" from t1 EMIT CHANGES;

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/elasticsearch-sink.properties etc/kafka-connect-hbase/hbase-sink.properties

分别对集群中的每台机器进行如下操作

部分ip需要填写本机ip,比如①处

“localhost”、“0.0.0.0”,这两个地址不会被正确识别,需要填写实际ip,如10.110.5.81

Kafka Broker

打开文件

修改(或添加)几个配置

broker.id=2

host.name=10.110.5.81①

zookeeper.connect=10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181

保存并退出

启动Broker

之后

ctrl + c停止该Broker

以后台进程方式启动Broker

确认Broker是否启动

输入<code>jps</code>,如果有名为SupportedKafka(Broker)的进程,则表示启动成功

Schema Register

vi etc/schema-registry/schema-registry.properties

listeners=http://10.110.5.81:8081

kafkastore.connection.url=10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

与Kafka Broker中的步骤五相似

Kafka Rest API

vi etc/kafka-rest/kafka-rest.properties

schema.registry.url=http://10.110.5.81:8081

./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties

与Kafka Broker中的步骤五不同的是,要以以下方式才能后台运行

nohup ./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties &gt; /dev/null 2&gt;&amp;1 &amp;

新建主题(3分区,2备份)

--zookeeper 10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181 \

--partitions 3 \

--replication-factor 2

Kafka connect

vi etc/schema-registry/connect-avro-distributed.properties

bootstrap.servers=10.110.5.81:9092,10.110.5.82:9092,10.110.5.83:9092

group.id=acp_cluster

./bin/connect-distributed etc/kafka/connect-distributed.properties

创建Connector

使用Kafka Rest API(8083)启动Connector,参数内容参照etc/kafka-connect-***/***.properties配置文件,如:

curl 'http://10.110.5.81:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '

{

"name":"elasticsearch-sink",

"config":{

"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max":10,

"topics":"ACP_KAFKA",

"key.ignore":true,

"schema.ignore":true,

"connection.url":"http://10.110.5.81:9200",

"type.name":"kafka-connect",

"schema.registry.url":"http://10.110.5.81:8081",

"key.converter":"io.confluent.connect.avro.AvroConverter",

"key.converter.schema.registry.url":"http://10.110.5.81:8081",

"value.converter":"io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url":"http://10.110.5.81:8081",

"topic.index.map":"ACP_KAFKA:acp_kafka_message"

}

'

"name":"oracle-sink",

"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max":"1",

"connection.url":"jdbc:oracle:thin:@10.110.1.132:1521:IDR",

"connection.user":"ppw",

"connection.password":"ppw",

"auto.create":"true",

"table.name.format":"ACP_KAFKA_MESSAGE",

"transforms":"Rename",

"transforms.Rename.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",

"transforms.Rename.renames":"id:ID,type:TYPE,msg:MESSAGE,time:TIME"

curl 10.110.5.81:8083/connectors/oracle-sink/config -X PUT -i -H "Content-Type:application/json" -d '{}'

这种方式不可行

./bin/confluent load elasticsearch-sink -d etc/kafka-connect-elasticsearch/elasticsearch-sink.properties

对Kafka Rest API的补充说明方法路径说明GET/connectors返回活动连接器的列表POST/connectors创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象GET/connectors/{name}获取有关特定连接器的信息GET/connectors/{name}/config获取特定连接器的配置参数PUT/connectors/{name}/config更新特定连接器的配置参数GET/connectors/{name}/status获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态GET/connectors/{name}/tasks获取当前为连接器运行的任务列表GET/connectors/{name}/tasks/{taskid}/status获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息PUT/connectors/{name}/pause暂停连接器及其任务,停止消息处理,直到连接器恢复PUT/connectors/{name}/resume恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)POST/connectors/{name}/restart重新启动连接器(通常是因为失败)POST/connectors/{name}/tasks/{taskId}/restart重启个别任务(通常是因为失败)DELETE/connectors/{name}删除连接器,停止所有任务并删除其配置

# Register a new version of a schema under the subject "trace-key"

# Register a new version of a schema under the subject "trace-value"

# List all subjects

# List all schema versions registered under the subject "trace-value"

# Fetch a schema by globally unique id 1

# Fetch version 1 of the schema registered under subject "trace-value"

# Fetch the most recently registered schema under subject "trace-value"

# Delete version 3 of the schema registered under subject "trace-value"

# Delete all versions of the schema registered under subject "trace-value"

# Check whether a schema has been registered under subject "trace-key"

# Test compatibility of a schema with the latest schema under subject "trace-value"

# Get top level config

# Update compatibility requirements globally

# Update compatibility requirements under the subject "trace-value"