kafka下載下傳位址:https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
前期準備:
cent os7 安裝zookeeper3.6.0:https://blog.csdn.net/weixin_39816740/article/details/104995674
Kafka叢集是把狀态儲存在Zookeeper中的,首先要搭建Zookeeper叢集。
上傳到目錄/usr/local/java/下(rz,sz)解壓
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPB1EMjpmT4dGVPpHOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1IjN2UDM0UTM3EDOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
解壓 tar -zxvf kafka_2.13-2.6.0.tgz
建立存放kafka日志目錄 mkdir kafkalogs
跟zookeeper一樣修改下檔案名字 mv kafka_2.13-2.6.0 kafka
複制2份分别叫kafka2,kafka3
cp -r kafka kafka2
cp -r kafka kafka3
修改下檔案名字,不小心整錯了:
分别修改修改配置config/server.properties(3個kafka中的配置)
我的zookeeper的連接配接端口:10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181
日志檔案路經:/usr/local/java/kafka/kafkalogs
vim kafka/config/server.properties
broker.id=1 #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
listeners=PLAINTEXT://10.108.3.61:9091 #監聽的IP和端口
advertised.listeners=PLAINTEXT://10.108.3.61:9091
log.dirs=/usr/local/java/kafka/kafkalogs #消息存放的目錄
zookeeper.connect=10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181 #設定zookeeper的連接配接端口
另外兩台機器同樣配置,不過注意修改broker.id和ip和端口。
vim kafka2/config/server.properties
broker.id=2 #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
listeners=PLAINTEXT://10.108.3.61:9092 #監聽的IP和端口
advertised.listeners=PLAINTEXT://10.108.3.61:9092
log.dirs=/usr/local/java/kafka2/kafkalogs #消息存放的目錄
zookeeper.connect=10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181 #設定zookeeper的連接配接端口
vim kafka3/config/server.properties
broker.id=3 #目前機器在叢集中的唯一辨別,和zookeeper的myid性質一樣
listeners=PLAINTEXT://10.108.3.61:9093 #監聽的IP和端口
advertised.listeners=PLAINTEXT://10.108.3.61:9093
log.dirs=/usr/local/java/kafka3/kafkalogs #消息存放的目錄
zookeeper.connect=10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181 #設定zookeeper的連接配接端口
檢視啟動的zookeeper狀态
依次啟動 kafka
kafka/bin/kafka-server-start.sh kafka/config/server.properties &
kafka2/bin/kafka-server-start.sh kafka2/config/server.properties &
kafka3/bin/kafka-server-start.sh kafka3/config/server.properties &
檢視是否啟動成功
netstat -anp|grep 9091
netstat -anp|grep 9092
netstat -anp|grep 9093
建立主題(在kafka目錄中)
kafka/bin/kafka-topics.sh --create --zookeeper 10.108.3.61:2181 --replication-factor 1 --partitions 1 --topic 主題名稱
檢視已有的主題
kafka/bin/kafka-topics.sh --list --zookeeper 10.108.3.61:2181
啟動生産者做消息釋出
kafka/bin/kafka-console-producer.sh --broker-list 10.108.3.61:9091 --topic ceshi
釋出消息
另開一個視窗訂閱消息并開啟消費者(注意都是在kafka目錄)
開啟消費者訂閱消息:
kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.108.3.61:9091 --from-beginning --topic ceshi
大功告成!
删除topic
kafka/bin/kafka-topics.sh --delete --zookeeper 10.108.3.61:2181 --topic ceshi
檢視删除成功