天天看点

CentOS7.2 kafka_2.12-1.1.0集群搭建1、集群环境2、开启相关端口3、启动zookeeper4、启动kafka5、操作kafka

1、集群环境

1.1 Linux服务器列表

IP HOSTNAME 操作系统
192.168.48.13 node3.xzsyr.com CentOS-7-x86_64-Minimal-1511
192.168.48.14 node4.xzsyr.com CentOS-7-x86_64-Minimal-1511
192.168.48.15 node5.xzsyr.com CentOS-7-x86_64-Minimal-1511

1.2 集群节点安装JDK

IP HOSTNAME JDK 版本说明
192.168.48.13 node3.xzsyr.com java version “1.8.0_131”
192.168.48.14 node4.xzsyr.com java version “1.8.0_131”
192.168.48.15 node5.xzsyr.com java version “1.8.0_131”

jdk安装配置,请参考Centos7 安装配置JDK1.8

1.2 zookeeper部署集群环境

kafka依赖zookeeper,建议自己搭建的zk集群环境(虽然kafka默认自带自己的zk组件)

IP HOSTNAME Zookeeper IsAble
192.168.48.13 node3.xzsyr.com YES
192.168.48.14 node4.xzsyr.com YES
192.168.48.15 node5.xzsyr.com YES

zk集群环境搭建,请参考Centos7 zookeeper-3.4.12集群搭建

1.3kafka搭建部署并启动

1.3.1下载kafka安装包

访问网址:http://kafka.apache.org/

左侧导航栏最下面有个Download按钮,点进去

进入网址:http://kafka.apache.org/downloads

CentOS7.2 kafka_2.12-1.1.0集群搭建1、集群环境2、开启相关端口3、启动zookeeper4、启动kafka5、操作kafka

1.3.2拷贝到centos系统目录下

mkdir -p /opt/module   #创建module文件夹,用于安装组件
mkdir -p /opt/software #创建sofware文件夹,用于存放安装包
           

将kafka_2.12-1.1.0.tgz上传导入/opt/software目录下

推荐工具:WinSCP、xftp

1.3.2解压到指定目录

1、切换目录到/opt/sofwware

cd /opt/software
           

2、解压到/opt/module目录下

1.3.3配置kafka

1、切换目录到kafka配置文件

2、编辑server.properties配置信息

命令:

vim server.properties
#修改以下配置
#为方便,直接将broker.id设置为了ip的最后一段,当集群中有多个Kafka时,他们的这个值必须不一样
broker.id=
#端口暂时不变
port=
#IP修改为本机的IP
host.name=node3.xzsyr.com
#可选配置项,将日志输出到指定的位置
log.dirs=/tmp/kafka-logs
#必须配置自己的zookeeper
zookeeper.connect=node3.xzsyr.com:,node4.xzsyr.com:,node5.xzsyr.com:
#在配置集群的时候,必须设置
listeners = PLAINTEXT://node3.xzsyr.com:
           

说明:如果是单机版的话,默认即可,我们什么都不需要改动。现在我们是要配置集群,所以需要配置一些参数

1、broker.id:每台机器不能一样

2、zookeeper.connect:因为我有3台zookeeper服务器,所以在这里zookeeper.connect设置为3台,必须全部加进去

3、listeners:在配置集群的时候,必须设置,不然以后的操作会报找不到leader的错误

3、拷贝kafka到另外两台服务器并修复配置

1、copy到node4、node5节点下

[root@node3 module]# scp -R kafka_2.12-1.1.0/ [email protected]:/opt/module/
[root@node3 module]# scp -R kafka_2.12-1.1.0/ [email protected]:/opt/module/
           

2、在node4节点下修改server.properties文件

修改broker.id和listeners

#ip的最后一段
broker.id=
#在配置集群的时候,必须设置
listeners = PLAINTEXT://node5.xzsyr.com:
           

3、在node5节点下修改server.properties

#ip的最后一段
broker.id=
#在配置集群的时候,必须设置
listeners = PLAINTEXT://node5.xzsyr.com:
           

2、开启相关端口

该操作在开启防火墙,情况下使用

三台机器都要开启,kafka通信默认是通过9092端口,也就是我们上面配的listeners

firewall-cmd --zone=public --add-port=9092/tcp --permanent
  firewall-cmd --reload
           

3、启动zookeeper

三台都要启动

4、启动kafka

三台都要启动

cd opt/module/kafka_2-
./bin/kafka-server-start.sh -daemon ./config/server.properties
           

jps命令检查是否启动成功

5、操作kafka

5.1创建topic

cd /opt/module/kafka_2.12-1.1.0/bin
bin/kafka-topics.sh --create --zookeeper node3.xzsyr.com:2181 --replication-factor 1 --partitions 1 --topic test
           

如果成功的话,会输出:Created topic “test”.

指令说明:

zookeeper:为zk服务器地址,已逗号分割配置多个

replication-factor:分区leader副本数,1代表没有副本即分区本身,建议为2

partitions:分区数

topic:topic名称

5.2查看topic

cd /opt/module/kafka_2-/
#查看所有topic
bin/kafka-topics.sh --list --zookeeper node3.xzsyr.com:
#查看test topic信息
bin/kafka-topics.sh --describe --zookeeper localhost: --topic test
           

topic描述说明:

leader:负责处理消息的读和写,leader是从所有节点中随机选择的.

Replicas:列出了所有的副本节点,不管节点是否在服务中.

Lsr:是正在服务中的节点.

5.3查看group偏移量

cd /opt/module/kafka_2-/bin
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper node3.xzsyr.com: --group Warning --topic test
           

5.4发布消息

bin/kafka-console-producer.sh --broker-list node3.xzsyr.com: --topic test
           

5.5消费消息

bin/kafka-console-consumer.sh --bootstrap-server node3.xzsyr.com: --topic test --from-beginning --zookeeper node3.xzsyr.com:
           

指令配置项说明:

from-beginning:每次从头开始消费

5.6删除topic