天天看點

Confluent介紹及其使用

1 confluent介紹

Confluent是用來管理群組織不同資料源的流媒體平台,可以實時地把不同源和位置的資料內建到一個中心的事件流平台。并且很可靠、性能很高。

Confluent目前提供了社群版(免費)和商業版(收費)兩個版本,社群版提供了Connectors、REST Proxy、KSQL、Schema-Registry等基礎服務。商業版為企業提供了控制台、負載均衡,跨中心資料備份、安全防護等進階特性。

1.2 服務功能介紹

1.2.1 Zookeeper

Zookeeper是一個開放源碼的分布式應用程式協調服務,主要功能包擴:維護配置資訊、命名、提供分布式同步、組管理等集中式服務 。Kafka使用ZooKeeper對叢集中繼資料進行持久化存儲,如果ZooKeeper丢失了Kafka資料,叢集的副本映射關系以及topic等配置資訊都會丢失,最終導緻Kafka叢集不再正常工作,造成資料丢失的後果。

1.2.2 Kafka

Kafka是一個分布式流處理平台,基于zookeeper協調并支援分區和多副本的分布式消息系統,是一種高吞吐量的分布式釋出訂閱消息系統,消息隊列中間件,主要功能是負責消息傳輸,Confluent就是依賴Kafka來進行消息傳輸。Kafka最大的特性就是可以實時的處理大量資料以滿足各種需求場景。

1.2.3 Control Center

control center可以很容易地管理kafka的連接配接,建立,編輯,和管理與其他系統的連接配接。我們可以從producer到consumer監控data streams,保證我們的每一條消息都被傳遞,還能測量出消息的傳輸耗時多久。使用confluent control center能讓開發人員不寫一句代碼,也能建構基于kafka的資料生産管道。

1.2.4 Kafka-rest

Kafka-rest是Kafka RESTful接口服務元件,可以通過Restful接口而不是本機Kafka協定或用戶端的情況下,生成和使用消息,而且還可以檢視叢集狀态以及執行管理操作。

1.2.5 Schema-Registry

Schema-Registry是為中繼資料管理提供的服務,同樣提供了RESTful接口用來存儲和擷取schemas,它能夠儲存資料格式變化的所有版本,并可以做到向下相容。Schema-Registry還為Kafka提供了Avro格式的序列化插件來傳輸消息。Confluent主要用Schema-Registry來對資料schema進行管理和序列化操作。

1.2.6 Connect

Kafka Connect是 Kafka的一個開源元件,是用來将Kafka與資料庫、key-value存儲系統、搜尋系統、檔案系統等外部系統連接配接起來的基礎架構。通過使用Kafka Connect架構以及現有的連接配接器可以實作從源資料讀入消息到Kafka,再從Kafka讀出消息到目的地的功能。

1.2.7 ksql-server

KSQL是使用SQL語句對Apache Kafka執行流處理任務的流式SQL引擎,Confluent 使用KSQL對Kafka的資料提供查詢服務.

2 confluent下載下傳

使用的開源的confluent的5.2.4版本

下載下傳連結:http://packages.confluent.io/archive/5.2/confluent-5.2.4-2.11.tar.gz

3 環境準備

分布式搭建建議至少3個節點,但是由于用于測試及節點緊張這裡使用2個節點

節點 zookeeper kafka control-center kafka-reset schema-registry connector ksql-server
10.0.165.8
10.0.165.9
2181 9092 9021 8082 8081 8083 8088

4 安裝

4.1 解壓

将下載下傳的檔案上傳至linux,然後解壓至相應的目錄下

tar -zxvf /opt/package/confluent-5.2.4-2.11.tar.gz -C /home/kafka/.local/
           

修改檔案名并進入到相應的目錄下

mv /home/kafka/.local/confluent-5.2.4 /home/kafka/.local/confluent
cd /home/kafka/.local/confluent
           

4.2 修改配置

修改10.0.165.8節點的相應配置

4.2.1 zookeeper配置

(1)vim /home/kafka/.local/confluent/etc/kafka/zookeeper.properties

##資料存放目錄,預設為/tmp/zookeepe存在删除風險
dataDir=/data/confluent/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2

 
##多個zookeeper server,server的編号1、2等要與myid中的一緻
server.1=10.0.165.8:2888:3888
server.2=10.0.165.9:2888:3888
           

(2)生成myid

echo 1 > /home/kafka/.local/confluent/etc/kafka/myid

(3)修改confluent服務啟動腳本,将myid釋出到confluent運作目錄下。

bin/confluent start會啟動confluent的各服務,且會将etc下的各配置,複制到confluent運作目錄下。

vim /home/kafka/.local/confluent/bin/confluent

在config_zookeeper()方法塊最後一行,添加

cp ${confluent_conf}/kafka/myid $confluent_current/zookeeper/data/

目的是将etc/kafka/myid拷貝到confluent運作目錄下,否則會報myid is no found,zookeeper啟動失敗。

4.2.2 Kafka配置

vim /home/kafka/.local/confluent/etc/kafka/server.properties

broker.id=0

#listeners與advertised.listeners可以隻配一個,與目前機器網卡有關系,請注意。advertised.listeners可能通用性更強,值為目前機器的ip與端口,其他機器ip無需配置
advertised.listeners=PLAINTEXT://10.0.165.8:9092
 
##根據實際情況調整
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100

#log.dirs是最重要的配置,kafka資料所在
log.dirs=/data/confluent/kafka-logs
num.partitions=12

num.recovery.threads.per.data.dir=1

message.max.bytes=10000000
replica.fetch.max.bytes= 10485760
auto.create.topics.enable=true
auto.leader.rebalance.enable = true

##備份因子數<=kafka節點數,若大于會報錯
default.replication.factor=2
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.flush.interval.messages=20000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=2000
log.retention.check.interval.ms=300000
log.cleaner.enable=true

##log失效時間,機關小時
log.retention.hours=48
zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

confluent.metrics.reporter.bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
confluent.metrics.reporter.topic.replicas=2

confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous

delete.topic.enable=true
group.initial.rebalance.delay.ms=0
           

4.2.3 kafka-rest

vim /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server-001
schema.registry.url=http://10.0.165.8:8081
zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
bootstrap.servers=PLAINTEXT://10.0.165.8:9092
port=8082
consumer.threads=8

access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
access.control.allow.origin=*
           

4.2.4 ksql

confluent-4沒有這個

vim /home/kafka/.local/confluent/etc/ksql/ksql-server.properties

ksql.service.id=default_
bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
listeners=http://0.0.0.0:8088
ksql.schema.registry.url=http://10.0.165.8:8081,http://10.0.165.9:8081
ksql.sink.partitions=4

           

4.2.5 confluent-control-center

vim /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties

bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
zookeeper.connect=10.0.165.8:2181,10.0.165.9:2181
confluent.controlcenter.rest.listeners=http://0.0.0.0:9021

 

#每個id要唯一,不然隻能啟動一個
confluent.controlcenter.id=1
confluent.controlcenter.data.dir=/data/confluent/control-center
confluent.controlcenter.connect.cluster=http://10.0.165.8:8083,http://10.0.165.9:8083

##每台都配置各自的ip
confluent.controlcenter.ksql.url=http://10.0.165.8:8088
confluent.controlcenter.schema.registry.url=http:/10.0.165.8:8081,http://10.0.165.9:8081

confluent.controlcenter.internal.topics.replication=2
confluent.controlcenter.internal.topics.partitions=2
confluent.controlcenter.command.topic.replication=2
confluent.monitoring.interceptor.topic.partitions=2
confluent.monitoring.interceptor.topic.replication=2
confluent.metrics.topic.replication=2

confluent.controlcenter.streams.num.stream.threads=30

           

4.2.6 schema-registry

vim /home/kafka/.local/confluent/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.0.165.8:9092,10.0.165.9:9092
kafkastore.topic=_schemas
debug=false

           

4.2.7 connect

vim /home/kafka/.local/confluent/etc/schema-registry/connect-avro-distributed.properties

bootstrap.servers=10.0.165.8:9092,10.0.165.9:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

 
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=2
offset.storage.replication.factor=2
status.storage.replication.factor=2

 

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

rest.port=8083
rest.advertised.port=8083

plugin.path=/home/kafka/.local/confluent/share/java
           

4.2.8 将confluent發送到其他節點

scp -r confluent/ [email protected]:/home/kafka/.local/

然後修改其他節點的配置

vi myid

2
           

vi /home/kafka/.local/confluent/etc/kafka/server.properties

broker.id=1
advertised.listeners=PLAINTEXT://10.0.165.9:9092
           

vi /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server-002
schema.registry.url=http://10.0.165.9:8081
bootstrap.servers=PLAINTEXT://10.0.165.9:9092
           

vi /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties

confluent.controlcenter.id=2
confluent.controlcenter.ksql.url=http://10.0.165.9:8088
           

然後在兩個節點的/data目錄下建立confluent并修改權限

sudo mkdir /data/confluent
sudo chown kafka:kafka /data/confluent
           

4.3 服務啟動與停止

4.3.1 全部服務啟動

啟動:bin/confluent start

檢視狀态:bin/confluent status

停止:bin/confluent stop

4.3.2 單獨啟動服務

服務單獨啟動

啟動kafka-rest

bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties
           

上面的這種方式是前台啟動,也可以以背景方式啟動。

nohup bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties &
           

啟動zookeeper

bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 
           

啟動kafka broker

bin/kafka-server-start -daemon  etc/kafka/server.properties
           

啟動schema registry

bin/schema-registry-start -daemon  etc/schema-registry/schema-registry.properties
           

5 安裝過程常見報錯

5.1 KafkaServer啟動失敗

[2020-06-27 04:28:15,713] FATAL [KafkaServer id=2] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Socket server failed to bind to 10.0.165.8:9092: Cannot assign requested address.
	at kafka.network.Acceptor.openServerSocket(SocketServer.scala:331)
	at kafka.network.Acceptor.<init>(SocketServer.scala:256)
	at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:97)
	at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at kafka.network.SocketServer.startup(SocketServer.scala:89)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:229)
	at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)
	at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)
Caused by: java.net.BindException: Cannot assign requested address
	at sun.nio.ch.Net.bind0(Native Method)
	at sun.nio.ch.Net.bind(Net.java:433)
	at sun.nio.ch.Net.bind(Net.java:425)
	at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
	at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)
	... 9 more
[2020-06-27 04:28:15,715] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
[2020-06-27 04:28:15,717] INFO [SocketServer brokerId=2] Shutting down (kafka.network.SocketServer)
[2020-06-27 04:28:15,718] INFO [SocketServer brokerId=2] Shutdown completed (kafka.network.SocketServer)
[2020-06-27 04:28:15,721] INFO Shutting down. (kafka.log.LogManager)
[2020-06-27 04:28:15,760] INFO Shutdown complete. (kafka.log.LogManager)
[2020-06-27 04:28:15,761] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-06-27 04:28:15,762] INFO Session: 0x27297ff0225a5a9 closed (org.apache.zookeeper.ZooKeeper)
[2020-06-27 04:28:15,764] INFO EventThread shut down for session: 0x27297ff0225a5a9 (org.apache.zookeeper.ClientCnxn)
[2020-06-27 04:28:15,765] INFO [KafkaServer id=2] shut down completed (kafka.server.KafkaServer)
[2020-06-27 04:28:15,766] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
           

自己copy了server.properties檔案到各個節點沒有修改下面的配置 監聽器的配置,應該指向節點本身的主機名和端口,我全部四台機器都指向了10.0.165.8,是以導緻了隻有節點1是正常的

advertised.listeners=PLAINTEXT://10.0.165.9:9092

5.2 Confluent schema-registry啟動失敗

[2020-06-27 16:09:39,872] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:242)
[2020-06-27 16:09:50,095] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
java.lang.IllegalArgumentException: Unable to subscribe to the Kafka topic _schemas backing this data store. Topic may not exist.
	at io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread.<init>(KafkaStoreReaderThread.java:125)
	at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:130)
	at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:199)
	at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:64)
	at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:42)
	at io.confluent.rest.Application.createServer(Application.java:157)
	at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
           

因為kafkaserver沒有啟動

6 常用操作

(1)啟動

confluent start

(2)檢視日志檔案目錄

confluent current

(3)列出連接配接

confluent list connectors

(4)檢視加載的連接配接器

confluent status connectors

[
"file-source"
]
           

(5)檢視具體連接配接器狀态

confluent status file-source

繼續閱讀