注意kafka的安装需要依赖Zookeeper集群 ,所以安装kafka之前先安装zookeeper!
zookeeper安装
- 上传安装包
- 解压
tar -zxvf zookeeper-3.4.6.tar.gz
- 修改配置文件
(1)进入配置文件目录
cd /usr/apps/zookeeper-3.4.6/conf
(2)修改配置文件名称
mv zoo_sample.cfg zoo.cfg
(3)编辑配置文件
vi zoo.cfg
dataDir=/usr/apps/data/zkdata
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=linux01:2888:3888
server.2=linux02:2888:3888
server.3=linux03:2888:3888
- 创建数据目录
mkdir -p /usr/apps/data/zkdata
- 分别在各个节点的数据存储目录中,生成一个myid文件,内容为它的id
echo 1 > /usr/apps/data/zkdata/myid
echo 2 > /usr/apps/data/zkdata/myid
echo 3 > /usr/apps/data/zkdata/myid
- 分发安装包
scp -r zookeeper-3.4.6 linux02:/usr/apps/ 单独分发
- 配置环境变量
vi /etc/profile
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/usr/apps/zookeeper-3.4.6
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile
注意:还需要分发环境变量
- zookeeper集启停群
bin/zkServer.sh start zk服务启动
bin/zkServer.sh status zk查看服务状态
bin/zkServer.sh stop zk停止服务
- 脚本启停
- 脚本启动
#!/bin/bash
for i in 1 2 3
do
ssh linux0${i} "source /etc/profile;/usr/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done
安装kafka集群
- 上传安装包
- 解压
tar -zxvf kafka_2.11-2.2.2.tgz
- 修改配置文件
- 进入配置文件目录
cd /usr/apps/kafka_2.11-2.2.2/config
- 编辑配置文件
vi server.properties
#为依次增长的:0、1、2、3、4,集群中唯一 id
broker.id=0
#数据存储的⽬录
log.dirs=/usr/apps/data/kafkadata
#指定zk集群地址
zookeeper.connect=lx01:2181,lx02:2181,lx03:2181
- 分发安装包
for i in {2..3}; do scp -r kafka_2.11-2.2.2 linux0$i:$PWD; done
- 配置环境变量
vi /etc/profile
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/usr/apps/kafka_2.11-2.2.2
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile
注意:还需要分发环境变量
- 分别在linux02和linux03上修改配置文件/usr/apps/kafka_2.11-2.2.2/server.properties中的broker.id=1,broker.id=2(broker.id不能重复)
- 启停集群(在各个节点上启动)
bin/kafka-server-start.sh -daemon /usr/apps/kafka_2.11-2.2.2/config/server.properties
启动集群
bin/kafka-server-stop.sh stop

shell客户端操作
创建topic
./kafka-topics.sh --zookeeper linux01:2181,linux02:2181,linux03:2181 --create --replication-factor 3 --partitions 3 --topic test
参数解释:
--replication-factor 副本数量
--partitions 分区数量
--topic topic名称
查看当前服务器中的所有topic
./kafka-topics.sh --zookeeper linux01:2181,linux02:2181,linux03:2181 --list
删除topic
./kafka-topics.sh --zookeeper linux01:2181,linux02:2181,linux03:2181 --delete --topic test
查看某个topic的详情
./kafka-topics.sh --zookeper linux01:2181,linux02:2181,linux03:2181 --describe --topic test
修改分区数
/kafka-topics.sh --zookeeper xu01:2181 --alter --topic test --partitions 5
producter
./kafka-console-producer.sh --broker-list xu01:9092 --topic test
>hello word
>kafka
>nihao
Consumer
./kafka-console-consumer.sh --bootstrap-server h1:9092 --from-beginning --topic test
# 指定要消费的分区,和要消费的起始offset
./kafka-console-consumer.sh --bootstrap-server h1:9092,h2:9092,h3:9092 --topic doit14 --offset 2 --partition 0