天天看点

一篇文章教你快速构建kafka分布式集群

作者:互联网蚂蚁哥

1、kakfa集群部署规划

这里我采用三台服务器来构建kakfa集群,由于kafka集群依赖zookeeper,所以在部署kafka之前,需要部署好zookeeper集群,zookeeper部署在hadoop课程中已经做过介绍,这里就不过多说明了,这里我将kafka和zookeeper部署在了一起,kakfa集群节点操作系统仍然采用centos7.7版本,各个主机角色和软件版本如下表所示:

IP地址 主机名 安装软件 软件版本
172.16.213.31 kafkazk1 Kafka+ ZooKeeper kafka_2.11-2.4.1、zookeeper-3.5.8
172.16.213.41 kafkazk2 Kafka+ ZooKeeper kafka_2.11-2.4.1、zookeeper-3.5.8
172.16.213.70 kafkazk3 Kafka+ ZooKeeper kafka_2.11-2.4.1、zookeeper-3.5.8

这里需要注意kafka和zookeeper的版本,默认kafka2.11版本自带的zookeeper依赖jar包版本为3.5.7,因此zookeeper的版本至少在zookeeper3.5.7以及以上。

2、下载与安装Kafka

Kafka也需要安装Java运行环境,这在前面课程已经介绍过了,这里不再介绍,大家可以从kafka官网https://kafka.apache.org/downloads 获取kafka安装包,这里推荐的版本是kafka_2.11-2.4.1.tgz。将下载下来的安装包直接解压到一个路径下即可完成kafka的安装,这里统一将kafka安装到/usr/local目录下,我以在kakfazk1主机为例,基本操作过程如下:

[root@kafkazk1~]# tar -zxvf kafka_2.11-2.4.1.tgz -C /usr/local
[root@kafkazk1~]# mv /usr/local/kafka_2.11-2.4.1 /usr/local/kafka           

这里我们将kafka安装到了/usr/local目录下。这里我创建了一个kafka用户,用来管理和维护kakfa集群,后面所有对kafka的操作都通过此用户来完成,执行如下操作进行创建用户和授权:

[root@kafkazk1~]# useradd kafka
[root@kafkazk1~]# chown -R kafka:kafka /usr/local/kafka           

在kafkazk1节点安装完成kafka后,先不着急拷贝安装程序到其它两个节点,因为还没有配置kakfa,等kafka配置完成,再统一打包拷贝到其它两个节点。

3、配置kafka集群

Kafka安装完成后,接着进入配置阶段,这里我们将kafka安装到/usr/local目录下,因此,kafka的主配置文件为/usr/local/kafka/config/server.properties,这里以节点kafkazk1为例,重点介绍一些常用配置项的含义:

broker.id=1
listeners=PLAINTEXT://172.16.213.31:9092
log.dirs=/usr/local/kafka/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=172.16.213.31:2181,172.16.213.41:2181,172.16.213.70:2181
auto.create.topics.enable=true
delete.topic.enable=true           

其中,每个配置项含义如下:

Ø broker.id:每一个broker在集群中的唯一表示,要求是正数。每个节点不能相同,这里我有三个节点,分别用1、2、3表示,当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。

Ø listeners:设置kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。如果设置为主机名,那么还需要将主机名与IP的对应关系本地解析到系统的/etc/hosts文件中。

Ø log.dirs:这个参数用于配置kafka保存数据的位置,kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的parition。需要注意的是,kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的parition的个数多小而定。

Ø num.partitions:这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。这里配置6个。

Ø log.retention.hours:这个参数用于配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms配置项。这三个参数都会控制删除过期数据的时间,推荐还是使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。

Ø log.segment.bytes:配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。

Ø zookeeper.connect:这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,其中每个部分的含义如下:

hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。

port: 表示是zookeeper服务器监听连接的端口号。

/path:表示kafka在zookeeper上的根目录。如果不设置,会使用根目录。

Ø auto.create.topics.enable:这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建,kafka会在broker上自动创建一个topic,如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。

Ø delete.topic.enable:在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。

Kakfa配置文件修改完成后,接着打包kakfa安装程序,将程序拷贝到其它两个节点,然后进行解压即可,注意,在其它两个节点上,broker.id务必要修改,kafka集群中broker.id不能有相同的。

4、启动kafka集群

三个节点的kafka配置完成后,就可以启动kafka了,但在启动kafka集群前,需要确保ZooKeeper集群已经正常启动。接着,依次在kafka各个节点上执行如下命令即可:

[root@kafkazk1~]# cd /usr/local/kafka
[root@kafkazk1 kafka]# nohup bin/kafka-server-start.sh config/server.properties &
[root@kafkazk1 kafka]# jps
21840 Kafka
15593 Jps
15789 QuorumPeerMain           

这里将kafka放到后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out文件,可通过此文件查看kafka的启动和运行状态。通过jps指令,可以看到有个Kafka标识,这是kafka进程成功启动的标志。

三、kafka集群基本命令操作

kafka提供了多个命令用于查看、创建、修改、删除topic信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于kafka安装目录的bin目录下,这里是/usr/local/kafka/bin。登录任意一台kafka集群节点,切换到此目录下,即可进行命令操作,下面简单列举了kafka的一些常用命令的使用方法。

(1)显示topic列表

命令执行方式如下所示:

[kafka@kafkazk1 bin]$ ./kafka-topics.sh --bootstrap-server 172.16.213.31:9092,172.16.213.41:9092,172.16.213.70:9092 --list           

其中,“--bootstrap-server”参数后面跟的是kakfa集群的主机列表和端口。具体操作实例如下图所示:

一篇文章教你快速构建kafka分布式集群

(2)创建一个topic,并指定topic属性(副本数、分区数等)

命令执行方式如下图所示:

一篇文章教你快速构建kafka分布式集群

其中:

Ø --create:表示创建一个topic。

Ø --replication-factor:表示这个topic的副本数,这里设置为1个。

Ø --partitions:指定topic的分区数,一般设置为小于或等于kafka集群节点数即可。

Ø --topic:指定要创建的topic的名称。

(3)查看某个topic的状态

命令执行方式如下图所示:

一篇文章教你快速构建kafka分布式集群

这里通过”--describe“选项查看刚刚创建好的testtopic的状态,其中:

Ø Partition:表示分区ID,通过输出可以看到,testtopic有三个分区,一个副本,这刚好和我们创建testtopic时指定的配置吻合。

Ø Leader:表示当前负责读写的Leader broker。

Ø Replicas:表示当前分区的所有副本对应的broker列表。

Ø Isr:表示处于活动状态的broker。

(4)生产消息

命令执行方式如下图所示:

一篇文章教你快速构建kafka分布式集群

这里需要注意,”--broker-list“后面跟的内容是kafka集群的ip和端口,当输入这条命令后,光标处于可写状态,接着就可以写入一些测试数据,每行一条,这里输入的内容如上图红框所示。

(5)消费消息

在生产消息的同时,再登录任意一台kafka集群节点,执行消费消息的命令,结果如下图所示:

一篇文章教你快速构建kafka分布式集群

可以看到,在第四步中,输入的消息在这里原样输出了,这样就完成了消息的消费。

(6)删除topic

命令执行方式如下图所示:

一篇文章教你快速构建kafka分布式集群

注意这里的“--delete”选项,用来删除一个指定的topic。然后通过“--list”查看发现这个topic确实已经被删除了。

本文主要讲解了kafka的概念、术语以及kafka分布式集群的构建,最后介绍了kafka的基本操作指令,要数量使用kafka,重要的是了解其内部实现机制以及运行原理。Kafka作为一个消息中间件,在集群架构环境中经常使用,特别是大数据架构环境中,因此,对于本文介绍的kafka知识点要求熟练掌握。