博主在之前已经介绍过如何部署
Kafka
,
Kafka
的部署模式只有集群模式,
Kafka
的架构本就是天然的集群架构,因此单节点的部署和多节点的部署是类似的。
集群节点:
节点 | 地址 |
ZooKeeper | 192.168.1.5 |
Kafka0 | 192.168.1.4 |
Kafka1 | 192.168.1.6 |
Kafka2 | 192.168.1.199 |
为了简单起见,这里不使用
ZooKeeper
集群,单机版的
ZooKeeper
即可满足要求。
- ZooKeeper :Shell脚本搭建单机版ZooKeeper
- ZooKeeper :搭建ZooKeeper集群
这
4
个节点都需要
Java
环境,以及
Kafka
集群的
3
个节点需要下载和上传
Kafka
相关文件,可参考下面这篇博客:
- Kafka:部署Kafka
ZooKeeper
ZooKeeper
的配置文件:
启动
ZooKeeper
:
[root@192 ~]# cd /usr/local/apache-zookeeper-3.6.3-bin/bin/
[root@192 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@192 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 9000. Client address: localhost. Client SSL: false.
Mode: standalone
ZooKeeper
启动成功。
Kafka集群
[root@192 ~]# cd /usr/local/kafka_2.13-3.0.0
[root@192 kafka_2.13-3.0.0]# vim config/server.properties
- Kafka0
- Kafka1
- Kafka2
启动每个
Kafka
节点:
[root@192 kafka_2.13-3.0.0]# bin/kafka-server-start.sh config/server.properties &
在
Kafka
节点上除了修改
broker.id
用于区别集群节点,就没有修改其他有关集群的配置信息了,比如没有添加
Kafka
集群所有节点的地址信息,而在搭建
ZooKeeper
集群时,是需要将
ZooKeeper
集群所有节点的地址信息在每个
ZooKeeper
节点上进行配置。那么
Kafka
节点是如何去发现集群中的其他节点?这其实就是靠
ZooKeeper
来维护
Kafka
的集群信息。
查询
ZooKeeper
中的数据:
[root@192 bin]# ./zkCli.sh -timeout 40000 -server 127.0.0.1:9000
...
[zk: 127.0.0.1:9000(CONNECTED) 4] ls -R /brokers
/brokers
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
[zk: 127.0.0.1:9000(CONNECTED) 5] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.4:9092"],"jmx_port":-1,"port":9092,"host":"192.168.1.4","version":5,"timestamp":"1642677446098"}
[zk: 127.0.0.1:9000(CONNECTED) 6] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.6:9092"],"jmx_port":-1,"port":9092,"host":"192.168.1.6","version":5,"timestamp":"1642677466209"}
[zk: 127.0.0.1:9000(CONNECTED) 7] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.199:9092"],"jmx_port":-1,"port":9092,"host":"192.168.1.199","version":5,"timestamp":"1642677476386"}
很显然
ZooKeeper
维护了
Kafka
的集群信息,将
Kafka1
节点上的服务停止。
ZooKeeper
也能动态更新
Kafka
的集群信息,这其实是
ZooKeeper
临时节点的相关特性。
[zk: 127.0.0.1:9000(CONNECTED) 8] ls -R /brokers
/brokers
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/2
- ZooKeeper :重要概念 & 客户端命令介绍
在
Kafka0
节点上创建
topic
:
[root@192 kafka_2.13-3.0.0]# bin/kafka-topics.sh --create --bootstrap-server 192.168.1.4:9092 --replication-factor 2 --partitions 2 --topic kaven-topic
[2022-01-20 19:43:51,408] INFO Creating topic kaven-topic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(0, 2), 1 -> ArrayBuffer(2, 0)) (kafka.zk.AdminZkClient)
[2022-01-20 19:43:51,530] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(kaven-topic-0) (kafka.server.ReplicaFetcherManager)
[2022-01-20 19:43:51,601] INFO [LogLoader partition=kaven-topic-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$)
[2022-01-20 19:43:51,611] INFO Created log for partition kaven-topic-0 in /tmp/kafka-logs/kaven-topic-0 with properties {} (kafka.log.LogManager)
[2022-01-20 19:43:51,612] INFO [Partition kaven-topic-0 broker=0] No checkpointed highwatermark is found for partition kaven-topic-0 (kafka.cluster.Partition)
[2022-01-20 19:43:51,613] INFO [Partition kaven-topic-0 broker=0] Log loaded for partition kaven-topic-0 with initial high watermark 0 (kafka.cluster.Partition)
[2022-01-20 19:43:51,703] INFO [LogLoader partition=kaven-topic-1, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$)
[2022-01-20 19:43:51,704] INFO Created log for partition kaven-topic-1 in /tmp/kafka-logs/kaven-topic-1 with properties {} (kafka.log.LogManager)
[2022-01-20 19:43:51,704] INFO [Partition kaven-topic-1 broker=0] No checkpointed highwatermark is found for partition kaven-topic-1 (kafka.cluster.Partition)
[2022-01-20 19:43:51,704] INFO [Partition kaven-topic-1 broker=0] Log loaded for partition kaven-topic-1 with initial high watermark 0 (kafka.cluster.Partition)
[2022-01-20 19:43:51,704] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions HashSet(kaven-topic-1) (kafka.server.ReplicaFetcherManager)
[2022-01-20 19:43:51,730] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker 2 for partitions Map(kaven-topic-1 -> InitialFetchState(BrokerEndPoint(id=2, host=192.168.1.199:9092),0,0)) (kafka.server.ReplicaFetcherManager)
[2022-01-20 19:43:51,751] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2022-01-20 19:43:51,757] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Truncating partition kaven-topic-1 to local high watermark 0 (kafka.server.ReplicaFetcherThread)
[2022-01-20 19:43:51,758] INFO [Log partition=kaven-topic-1, dir=/tmp/kafka-logs] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)
Created topic kaven-topic.
在
Kafka2
节点上查询已经创建的
topic
:
[root@192 kafka_2.13-3.0.0]# bin/kafka-topics.sh --list --bootstrap-server 192.168.1.199:9092
kaven-topic
很显然
Kafka
集群的数据已经同步了,
ZooKeeper
中也有这些信息:
[zk: 127.0.0.1:9000(CONNECTED) 9] ls -R /brokers
/brokers
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/2
/brokers/topics/kaven-topic
/brokers/topics/kaven-topic/partitions
/brokers/topics/kaven-topic/partitions/0
/brokers/topics/kaven-topic/partitions/1
/brokers/topics/kaven-topic/partitions/0/state
/brokers/topics/kaven-topic/partitions/1/state
创建的
topic
有两个
partition
以及每个
partition
有两个
replication
(
--replication-factor 2 --partitions 2
),第
0
个
partition
的信息如下所示,可见两个
replication
分布在
Kafka0
和
Kafka2
节点上,而其中的
leader
在
Kafka0
节点上。
[zk: 127.0.0.1:9000(CONNECTED) 11] get /brokers/topics/kaven-topic/partitions/0/state
{"controller_epoch":3,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2]}
第
1
个
partition
的信息如下所示,可见两个
replication
分布在
Kafka0
和
Kafka2
节点,而其中的
leader
在
Kafka1
节点上。
[zk: 127.0.0.1:9000(CONNECTED) 12] get /brokers/topics/kaven-topic/partitions/1/state
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0]}
- Kafka:Topic概念与API介绍