天天看点

Kafka:搭建Kafka集群

博主在之前已经介绍过如何部署​

​Kafka​

​​,​

​Kafka​

​​的部署模式只有集群模式,​

​Kafka​

​的架构本就是天然的集群架构,因此单节点的部署和多节点的部署是类似的。

集群节点:

节点 地址
ZooKeeper 192.168.1.5
Kafka0 192.168.1.4
Kafka1 192.168.1.6
Kafka2 192.168.1.199

为了简单起见,这里不使用​

​ZooKeeper​

​​集群,单机版的​

​ZooKeeper​

​即可满足要求。

  • ​​ZooKeeper :Shell脚本搭建单机版ZooKeeper​​
  • ​​ZooKeeper :搭建ZooKeeper集群​​

这​

​4​

​​个节点都需要​

​Java​

​​环境,以及​

​Kafka​

​​集群的​

​3​

​​个节点需要下载和上传​

​Kafka​

​相关文件,可参考下面这篇博客:

  • ​​Kafka:部署Kafka​​

ZooKeeper

​ZooKeeper​

​的配置文件:

Kafka:搭建Kafka集群

启动​

​ZooKeeper​

​:

[root@192 ~]# cd /usr/local/apache-zookeeper-3.6.3-bin/bin/
[root@192 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@192 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 9000. Client address: localhost. Client SSL: false.
Mode: standalone      

​ZooKeeper​

​启动成功。

Kafka集群

[root@192 ~]# cd /usr/local/kafka_2.13-3.0.0
[root@192 kafka_2.13-3.0.0]# vim config/server.properties      
  • Kafka0
  • Kafka1
  • Kafka2

启动每个​

​Kafka​

​节点:

[root@192 kafka_2.13-3.0.0]# bin/kafka-server-start.sh config/server.properties &      
Kafka:搭建Kafka集群

在​

​Kafka​

​​节点上除了修改​

​broker.id​

​​用于区别集群节点,就没有修改其他有关集群的配置信息了,比如没有添加​

​Kafka​

​​集群所有节点的地址信息,而在搭建​

​ZooKeeper​

​​集群时,是需要将​

​ZooKeeper​

​​集群所有节点的地址信息在每个​

​ZooKeeper​

​​节点上进行配置。那么​

​Kafka​

​​节点是如何去发现集群中的其他节点?这其实就是靠​

​ZooKeeper​

​​来维护​

​Kafka​

​的集群信息。

查询​

​ZooKeeper​

​中的数据:

[root@192 bin]# ./zkCli.sh -timeout 40000 -server 127.0.0.1:9000
...
[zk: 127.0.0.1:9000(CONNECTED) 4] ls -R /brokers 
/brokers
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
[zk: 127.0.0.1:9000(CONNECTED) 5] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.4:9092"],"jmx_port":-1,"port":9092,"host":"192.168.1.4","version":5,"timestamp":"1642677446098"}
[zk: 127.0.0.1:9000(CONNECTED) 6] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.6:9092"],"jmx_port":-1,"port":9092,"host":"192.168.1.6","version":5,"timestamp":"1642677466209"}
[zk: 127.0.0.1:9000(CONNECTED) 7] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.199:9092"],"jmx_port":-1,"port":9092,"host":"192.168.1.199","version":5,"timestamp":"1642677476386"}      

很显然​

​ZooKeeper​

​​维护了​

​Kafka​

​​的集群信息,将​

​Kafka1​

​节点上的服务停止。

Kafka:搭建Kafka集群

​ZooKeeper​

​​也能动态更新​

​Kafka​

​​的集群信息,这其实是​

​ZooKeeper​

​临时节点的相关特性。

[zk: 127.0.0.1:9000(CONNECTED) 8] ls -R /brokers 
/brokers
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/2      
  • ​​ZooKeeper :重要概念 & 客户端命令介绍​​

在​

​Kafka0​

​​节点上创建​

​topic​

​:

[root@192 kafka_2.13-3.0.0]# bin/kafka-topics.sh --create --bootstrap-server 192.168.1.4:9092 --replication-factor 2 --partitions 2 --topic kaven-topic
[2022-01-20 19:43:51,408] INFO Creating topic kaven-topic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(0, 2), 1 -> ArrayBuffer(2, 0)) (kafka.zk.AdminZkClient)
[2022-01-20 19:43:51,530] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(kaven-topic-0) (kafka.server.ReplicaFetcherManager)
[2022-01-20 19:43:51,601] INFO [LogLoader partition=kaven-topic-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$)
[2022-01-20 19:43:51,611] INFO Created log for partition kaven-topic-0 in /tmp/kafka-logs/kaven-topic-0 with properties {} (kafka.log.LogManager)
[2022-01-20 19:43:51,612] INFO [Partition kaven-topic-0 broker=0] No checkpointed highwatermark is found for partition kaven-topic-0 (kafka.cluster.Partition)
[2022-01-20 19:43:51,613] INFO [Partition kaven-topic-0 broker=0] Log loaded for partition kaven-topic-0 with initial high watermark 0 (kafka.cluster.Partition)
[2022-01-20 19:43:51,703] INFO [LogLoader partition=kaven-topic-1, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$)
[2022-01-20 19:43:51,704] INFO Created log for partition kaven-topic-1 in /tmp/kafka-logs/kaven-topic-1 with properties {} (kafka.log.LogManager)
[2022-01-20 19:43:51,704] INFO [Partition kaven-topic-1 broker=0] No checkpointed highwatermark is found for partition kaven-topic-1 (kafka.cluster.Partition)
[2022-01-20 19:43:51,704] INFO [Partition kaven-topic-1 broker=0] Log loaded for partition kaven-topic-1 with initial high watermark 0 (kafka.cluster.Partition)
[2022-01-20 19:43:51,704] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions HashSet(kaven-topic-1) (kafka.server.ReplicaFetcherManager)
[2022-01-20 19:43:51,730] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker 2 for partitions Map(kaven-topic-1 -> InitialFetchState(BrokerEndPoint(id=2, host=192.168.1.199:9092),0,0)) (kafka.server.ReplicaFetcherManager)
[2022-01-20 19:43:51,751] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2022-01-20 19:43:51,757] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Truncating partition kaven-topic-1 to local high watermark 0 (kafka.server.ReplicaFetcherThread)
[2022-01-20 19:43:51,758] INFO [Log partition=kaven-topic-1, dir=/tmp/kafka-logs] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)
Created topic kaven-topic.      

在​

​Kafka2​

​​节点上查询已经创建的​

​topic​

​:

[root@192 kafka_2.13-3.0.0]# bin/kafka-topics.sh --list --bootstrap-server 192.168.1.199:9092
kaven-topic      

很显然​

​Kafka​

​​集群的数据已经同步了,​

​ZooKeeper​

​中也有这些信息:

[zk: 127.0.0.1:9000(CONNECTED) 9] ls -R /brokers 
/brokers
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/2
/brokers/topics/kaven-topic
/brokers/topics/kaven-topic/partitions
/brokers/topics/kaven-topic/partitions/0
/brokers/topics/kaven-topic/partitions/1
/brokers/topics/kaven-topic/partitions/0/state
/brokers/topics/kaven-topic/partitions/1/state      

创建的​

​topic​

​​有两个​

​partition​

​​以及每个​

​partition​

​​有两个​

​replication​

​​(​

​--replication-factor 2 --partitions 2​

​​),第​

​0​

​​个​

​partition​

​​的信息如下所示,可见两个​

​replication​

​​分布在​

​Kafka0​

​​和​

​Kafka2​

​​节点上,而其中的​

​leader​

​​在​

​Kafka0​

​节点上。

[zk: 127.0.0.1:9000(CONNECTED) 11] get /brokers/topics/kaven-topic/partitions/0/state
{"controller_epoch":3,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2]}      

第​

​1​

​​个​

​partition​

​​的信息如下所示,可见两个​

​replication​

​​分布在​

​Kafka0​

​​和​

​Kafka2​

​​节点,而其中的​

​leader​

​​在​

​Kafka1​

​节点上。

[zk: 127.0.0.1:9000(CONNECTED) 12] get /brokers/topics/kaven-topic/partitions/1/state
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0]}      
  • ​​Kafka:Topic概念与API介绍​​