Broker:叢集中的一個執行個體。
Workers:運作Connector和Task的程序。
Task:資料寫入Kafka和從Kafka中讀出的具體實作。
Connector:通過管理Task來協調資料流的進階抽象。
Source:負責導入資料到Kafka。
Sink:負責從Kafka導出資料。
支援分布式部署。
Converter:Connector和其他存儲系統直接發送或者接受資料之間轉換資料。
Transform:針對值(鍵值對中的Value)的輕量級資料調整的工具。
調用接口建立索引:
http://localhost:7002/search/initialization/
Kafka的主題,Oracle的表名是區分大小寫的,推薦使用大寫
下載下傳confluent-5.2.1-2.12.tar,上傳伺服器(/home/目錄下)後解壓,進入其目錄
cd /home/confluent-5.2.1
Zookeeper(2181)
如果環境中已經有Zookeeper,直接連接配接環境中的Zookeeper,無需啟動自帶的Zookeeper;如果沒有,才使用自帶的Zookeeper:
./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
注意:先使用前台運作方式啟動程式,待程式正常啟動、不報錯後可以再改用守護程序(-daemon)的方式啟動
啟動Broker(9092)
修改配置檔案
打開配置檔案
vi ./etc/kafka/server.properties
确認Zookeeper
zookeeper.connect=10.110.5.83:2181
補充配置項
host.name=10.110.5.81
listeners=PLAINTEXT://10.110.5.81:9092
advertised.listeners=PLAINTEXT://10.110.5.81:9092
退出儲存
啟動
./bin/kafka-server-start -daemon etc/kafka/server.properties
啟動Schema Registry(8081)
./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
建立Kafka測試主題“trace”(這一步是多餘的,因為啟動Producer、Consumer後,Kafka會自動建立主題)
./bin/kafka-topics \
--topic trace \
--create \
--bootstrap-server 10.110.5.81:9092 \
--replication-factor 1 \
--partitions 1
單例模式啟動Connector
修改兩個配置檔案
connect-standalone.properties
打開該檔案
vi etc/kafka/connect-standalone.properties
修改以下兩項配置為false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
修改位址(預設為本機)
bootstrap.servers=10.110.5.81:9092
儲存後關閉
elasticsearch-sink.properties
複制連接配接檔案
cp etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties etc/kafka-connect-jdbc/elasticsearch-sink.properties
vi etc/kafka-connect-elasticsearch/elasticsearch-sink.properties
修改配置
topics=trace
connection.url=http://10.110.5.84:9200
#與hbase保持統一,故采用“info”:
type.name=info
#新增模式忽略,防止抛出異常(可能多餘)
schema.ignore=true
以單例模式啟動Connector
./bin/connect-standalone -daemon etc/kafka/connect-standalone.properties etc/kafka-connect-elasticsearch/elasticsearch-sink.properties \
--property print.key=true \
--property schema.registry.url=http://10.110.5.81:8081
新開XShell标簽,啟動Consumer,以監聽消息是否發送(心理安慰,作用不大)
./bin/kafka-console-consumer \
--from-beginning \
--property print.key=true
新開XShell标簽,啟動Producer,生産資料
指定Schema
./bin/kafka-avro-console-producer \
--broker-list 10.110.5.81:9092 \
--property parse.key=true \
--property key.separator=: \
--property schema.registry.url=http://10.110.5.81:8081 \
--property key.schema='{"name":"key","type":"string"}' \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"xw","type":"string"},{"name":"ddbh","type":"string"},{"name":"fssj","type":"string"},{"name":"location","type":"string"},{"name":"jd","type":"string"},{"name":"wd","type":"string"},{"name":"zdh","type":"string"},{"name":"sfd","type":"string"},{"name":"mdd","type":"string"},{"name":"zj","type":"string"}]}'
以JSON格式輸入測試資料
"R210124198701172625_20201208144517":{"id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201208144517","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}
驗證結果
在打開了Consumer的視窗中我們可以看到剛才生産的資料
向ElasticSearch查詢,在浏覽器中輸入以下連結,可以看到資料已經插入了
http://10.110.5.81:9200/trace/_search?pretty
可使用<code>jps</code>檢視相關背景程序
可使用以下指令來監聽端口,檢視相關程式是否運作
netstat -natpl
netstat -tupln|grep 9092
删除ES的索引
curl -XDELETE http://localhost:9200/trace
Confluent Control Center(9021)CLI
Kafka Connector REST API(8083)
Kafka Rest Proxy(8082)
./bin/kafka-rest-start -daemon etc/kafka-rest/kafka-rest.properties
檢視所有主題
--list \
--zookeeper 10.110.5.83:2181
删除某一主題,見:
标記要删除的主題
--delete \
修改配置檔案,使删除功能可用
vi etc/kafka/server.properties
delete.topic.enable=true
在上面打開的配置檔案中找到資料目錄“log.dirs=/tmp/kafka-logs”,然後進入
cd /tmp/kafka-logs
rm -rf trace
停止Kafka
推薦
./bin/kafka-server-stop
強制
jps
kill -9 *****
重新開機Kafka
./bin/kafka-server-start etc/kafka/server.properties
确認是否已删除
可能需要重新開機Zookeeper
之後删除不需要重新開機和手動删除檔案,隻需調用步驟8.1。參考自:
kafka如何徹底删除topic及資料 kafka全部資料清空與某一topic資料清空
異常
生産者異常:Schema being registered is incompatible with an earlier schema
解決方法:
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' http://10.110.5.81:8081/config
檢視某一主題的情況
--describe \
--bootstrap-server 10.110.5.81:9092
資料庫表
1. 資料庫表在建立時,字段名不能用雙引号引起來,否則會報錯:ORA-00904: invalid identifier
準備工作
本章節在《Kafka Connect Sink Elasticsearch》基礎之上進行
進入指定依賴包目錄
cd /home/confluent-5.2.1/share/java/kafka-connect-jdbc
上傳ojdbc6.jar到步驟3中的目錄
傳回confluent目錄
啟動Connector
cp etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties etc/kafka-connect-jdbc/oracle-sink.properties
打開配置檔案oracle-sink.properties
vi etc/kafka-connect-jdbc/oracle-sink.properties
name=oracle-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:oracle:thin:@位址:1521:IDR
connection.user=使用者名
connection.password=密碼
auto.create=true
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/oracle-sink.properties
因為在oracle-sink.properties中設定了auto.create=true,是以Connector會在Oracle中自動建立與主題同名的表(是以主題才大寫的)
啟動Producer
輸入資料
{"id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201208144517","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}
驗證結果。可以在相應的資料庫中查詢到資料,即證明成功了
SELECT * FROM trace;
cd /home/confluent-5.2.1/
HBase的相關支援未包含在Confluent的jar中,需要單獨下載下傳解壓更名檔案夾
下載下傳并上傳到伺服器後解壓縮confluentinc-kafka-connect-hbase-1.0.5.zip
改名
cd confluentinc-kafka-connect-hbase-1.0.5
将etc改名為etc/kafka-connect-hbase/
将lib改名為share/java/kafka-connect-hbase/
讓後将這兩個檔案夾合并到confluent中去
HBase準備
補充DNS
打開hosts
vim /etc/hosts
粘貼大資料環境的DNS
建立表(TM_TRACE)及列族(info)
補充配置檔案
上傳hbase-site.xml到/home/confluent-5.2.1/
打包配置檔案
jar -uvf share/java/kafka-connect-hbase/kafka-connect-hbase-1.0.5.jar hbase-site.xml
cp etc/kafka-connect-hbase/hbase-sink-quickstart.properties etc/kafka-connect-hbase/hbase-sink.properties
vi etc/kafka-connect-hbase/hbase-sink.properties
name=hbase-sink-connector
connector.class=io.confluent.connect.hbase.HBaseSinkConnector
hbase.zookeeper.quorum=master2.bds.inspur,manager.bds.inspur,master1.bds.inspur
hbase.zookeeper.property.clientPort=2181
gcp.bigtable.instance.id=HBASE-INSTANCE
auto.create.tables=true
auto.create.column.families=true
# auto.offset.rest=latest
# 表名;列族名在資料中指明
table.name.format=TM_TRACE
error.mode=ignore
topics=tm_trace
confluent.topic=tm_trace
confluent.topic.replication.factor=1
confluent.topic.bootstrap.servers=10.110.5.81:9092
insert.mode=UPSERT
max.batch.size=1000
# row.key.definition=對象類型,證件号碼,分隔符,發生時間,三位防重碼
row.key.definition=type,id,delimiter,fssj,unrepeat
#去除多餘的值(對于elasticsearch來說不多餘)
transforms=DropLocation
transforms.DropLocation.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.DropLocation.fields=location
儲存後退出
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hbase/hbase-sink.properties
啟動指令行Consumer輔助檢視
./bin/kafka-avro-console-consumer \
--topic tm_trace \
啟動./bin/kafka-avro-console-producer \--topic tm_trace \--broker-list 10.110.5.81:9092 \--property parse.key=true \--property key.separator=@ \--property schema.registry.url=http://10.110.5.81:8081 \--property key.schema='{"namespace": "com.inspur.trace.entity.schema","name": "TraceKey","type":"string"}' \--property value.schema='{"namespace": "com.inspur.trace.entity.schema","name": "TraceHBaseValue","type":"record","fields":[{"name":"info","type":{"name":"columns","type":"record","fields":[{"name":"type","type":"string"},{"name":"id","type":"string"},{"name":"xw","type":"string"},{"name":"ddbh","type":"string"},{"name":"fssj","type":"string"},{"name":"location","type":"string"},{"name":"jd","type":"string"},{"name":"wd","type":"string"},{"name":"zdh","type":"string"},{"name":"sfd","type":"string"},{"name":"mdd","type":"string"},{"name":"zj","type":"string"}]}}]}'
注意:
hbase-connector使用的是結構化資料(org.apache.kafka.connect.data.Struct),是以需要使用能發送結構化資料的Producer,即avro-producer。
并且列族“info”需要在Producer中配置(符合HBase的理念),是以需要我們用avro配置模闆(Schema)用于解析結構化資料。
而且,如果想使用拼接列族功能,也需要為行鍵配置模闆并發送結構化的行鍵。
是以才有了如上的指令。
關于Producer啟動指令中參數的說明
# 開啟鍵的格式化
parse.key=true
# 将“@”作為鍵與值的分隔符
key.separator=@
# 模闆注冊中心位址
schema.registry.url=……
# 鍵的解析模闆
key.schema=……
# 值的解析模闆
value.schema=……
關于行鍵
通過閱讀RowKeyExtractor的源代碼可知,HBaseSinkConnector有兩種方式的接收行鍵:
一種是接收到的行鍵是結構化資料,HBaseSinkConnector将根據模闆将結構化資料拼接為rowkey(需要在hbase-sink.properties指明字段名,同時還需要在Producer中啟用并傳遞結構化鍵);
另一種是未指明行鍵字段名,然後将鍵(key)作為行鍵(rowkey)。
兩種方式之外的搭配,不做處理,抛出異常。
采用結構化拼接鍵雖然進階,但是徒增複雜度,目前我想不出采用這種方式的的必要性,這裡隻是做技術驗證,是以采用的是結構化拼接鍵。
真正應用的時候還是推薦直接使用鍵作為行鍵, 即,将“row.key.definition”注釋掉。
輸入測試資料
"R210124198701172625_20201210104500est"@{"info":{"type":"R","id":"210124198701172625","xw":"TL","ddbh":"D1024","fssj":"20201210104502","location":"39.915119,116.403963","jd":"116.403963","wd":"39.915119","zdh":"01","sfd":"JNK","mdd":"WFK","zj":"test"}}
驗證結果:直接檢視HBase中的TM_TRACE是否有剛插入的值
簡介
Elasticsearch、HBase的表結構是不相容的:
因為在不修改原jar中代碼的前提下,HBase的資料有兩層,外層為列族,比如列族為“info”的資料“{"info":{"xw":"LD","type":"R",……}}”,但是Elasticsearch的資料就一層,如果es-sink按照hbase-sink的兩層結構來抽取資料到索引中,必然報錯。
軌迹應用依賴Elasticsearch的基于地理位置的搜尋功能,這就要求我們新增一個geo_point類型的字段用來進行緯經度的搜尋(暫且稱呼該字段為“location”),但是HBase并不需要這一字段,會多餘。
而KSQL可以解決這倆問題。
KSQL
KSQL就是一個用來操作Kafka中的資料流的工具。
具體地來說,KSQL可以通過編寫SQL來操作Kafka中的資料流,它可以解析結構化的資料,以及直接輸出結構化資料的指定字段,還可以新增字段。
服務端
vim etc/ksqldb/ksql-server.properties
編輯配置檔案
listeners=http://10.110.5.81:8088
ksql.logging.processing.topic.auto.create=true
ksql.logging.processing.stream.auto.create=true
ksql.schema.registry.url=http://10.110.5.81:8081
啟動服務端
./bin/ksql-server-start-6.0.1 etc/ksqldb/ksql-server.properties
用戶端
啟動完服務端後,我們還需要啟動用戶端,增加一系列配置。
./bin/ksql http://10.110.5.81:8088
建立新資料流,以便從“tm_trace”中引入原始資料
create stream trace_hbase (info struct) with (kafka_topic='tm_trace', value_format='avro');
注意:大小寫不敏感,除非用英文雙引号包含,與Oracle相仿
解析“tm_trace”,去除外層包裹,同時新增地理位置字段“location”,然後将該資料流寫入到負責向elasticsearch中寫入資料的主題中去。
create stream t1 with (kafka_topic='trace') as select info->type "type", info->id "id", info->xw "xw", info->ddbh "ddbh", info->fssj "fssj", info->jd "jd", info->wd "wd", struct("lat" := info->wd, "lon" := info->jd) AS "location", info->zdh "zdh", info->sfd "sfd", info->mdd "mdd", info->zj "zj" from trace_hbase;
注意:主題已更換,需要到./etc/kafka-connect-hbase/hbase-sink.properties中修改主題
修改SQL很遺憾,KSQL不支援修改已經建立的資料流,我們需要删除原來的,然後重新建立:
确認查詢是否正在運作
show queries;
如果有,需要将其停掉
terminate QueryId;(QueryId大小寫敏感)
檢視流的名稱
show streams;
删除舊的流
drop stream trace_hbase;
drop stream trace_elasticsearch;
其他有用SQL參照
set 'auto.offset.reset' = 'earliest';
select "jd", "wd" from t1 EMIT CHANGES;
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/elasticsearch-sink.properties etc/kafka-connect-hbase/hbase-sink.properties
分别對叢集中的每台機器進行如下操作
部分ip需要填寫本機ip,比如①處
“localhost”、“0.0.0.0”,這兩個位址不會被正确識别,需要填寫實際ip,如10.110.5.81
Kafka Broker
打開檔案
修改(或添加)幾個配置
broker.id=2
host.name=10.110.5.81①
zookeeper.connect=10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181
儲存并退出
啟動Broker
之後
ctrl + c停止該Broker
以背景程序方式啟動Broker
确認Broker是否啟動
輸入<code>jps</code>,如果有名為SupportedKafka(Broker)的程序,則表示啟動成功
Schema Register
vi etc/schema-registry/schema-registry.properties
listeners=http://10.110.5.81:8081
kafkastore.connection.url=10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
與Kafka Broker中的步驟五相似
Kafka Rest API
vi etc/kafka-rest/kafka-rest.properties
schema.registry.url=http://10.110.5.81:8081
./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties
與Kafka Broker中的步驟五不同的是,要以以下方式才能背景運作
nohup ./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties > /dev/null 2>&1 &
建立主題(3分區,2備份)
--zookeeper 10.110.5.81:2181,10.110.5.82:2181,10.110.5.83:2181 \
--partitions 3 \
--replication-factor 2
Kafka connect
vi etc/schema-registry/connect-avro-distributed.properties
bootstrap.servers=10.110.5.81:9092,10.110.5.82:9092,10.110.5.83:9092
group.id=acp_cluster
./bin/connect-distributed etc/kafka/connect-distributed.properties
建立Connector
使用Kafka Rest API(8083)啟動Connector,參數内容參照etc/kafka-connect-***/***.properties配置檔案,如:
curl 'http://10.110.5.81:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '
{
"name":"elasticsearch-sink",
"config":{
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max":10,
"topics":"ACP_KAFKA",
"key.ignore":true,
"schema.ignore":true,
"connection.url":"http://10.110.5.81:9200",
"type.name":"kafka-connect",
"schema.registry.url":"http://10.110.5.81:8081",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://10.110.5.81:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://10.110.5.81:8081",
"topic.index.map":"ACP_KAFKA:acp_kafka_message"
}
'
"name":"oracle-sink",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"connection.url":"jdbc:oracle:thin:@10.110.1.132:1521:IDR",
"connection.user":"ppw",
"connection.password":"ppw",
"auto.create":"true",
"table.name.format":"ACP_KAFKA_MESSAGE",
"transforms":"Rename",
"transforms.Rename.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.Rename.renames":"id:ID,type:TYPE,msg:MESSAGE,time:TIME"
curl 10.110.5.81:8083/connectors/oracle-sink/config -X PUT -i -H "Content-Type:application/json" -d '{}'
這種方式不可行
./bin/confluent load elasticsearch-sink -d etc/kafka-connect-elasticsearch/elasticsearch-sink.properties
對Kafka Rest API的補充說明方法路徑說明GET/connectors傳回活動連接配接器的清單POST/connectors建立一個新的連接配接器; 請求主體應該是包含字元串name字段和config帶有連接配接器配置參數的對象字段的JSON對象GET/connectors/{name}擷取有關特定連接配接器的資訊GET/connectors/{name}/config擷取特定連接配接器的配置參數PUT/connectors/{name}/config更新特定連接配接器的配置參數GET/connectors/{name}/status擷取連接配接器的目前狀态,包括連接配接器是否正在運作,失敗,已暫停等,配置設定給哪個工作者,失敗時的錯誤資訊以及所有任務的狀态GET/connectors/{name}/tasks擷取目前為連接配接器運作的任務清單GET/connectors/{name}/tasks/{taskid}/status擷取任務的目前狀态,包括如果正在運作,失敗,暫停等,配置設定給哪個從業人員,如果失敗,則傳回錯誤資訊PUT/connectors/{name}/pause暫停連接配接器及其任務,停止消息處理,直到連接配接器恢複PUT/connectors/{name}/resume恢複暫停的連接配接器(或者,如果連接配接器未暫停,則不執行任何操作)POST/connectors/{name}/restart重新啟動連接配接器(通常是因為失敗)POST/connectors/{name}/tasks/{taskId}/restart重新開機個别任務(通常是因為失敗)DELETE/connectors/{name}删除連接配接器,停止所有任務并删除其配置
# Register a new version of a schema under the subject "trace-key"
# Register a new version of a schema under the subject "trace-value"
# List all subjects
# List all schema versions registered under the subject "trace-value"
# Fetch a schema by globally unique id 1
# Fetch version 1 of the schema registered under subject "trace-value"
# Fetch the most recently registered schema under subject "trace-value"
# Delete version 3 of the schema registered under subject "trace-value"
# Delete all versions of the schema registered under subject "trace-value"
# Check whether a schema has been registered under subject "trace-key"
# Test compatibility of a schema with the latest schema under subject "trace-value"
# Get top level config
# Update compatibility requirements globally
# Update compatibility requirements under the subject "trace-value"