1、叢集環境
1.1 Linux伺服器清單
IP | HOSTNAME | 作業系統 |
---|---|---|
192.168.48.13 | node3.xzsyr.com | CentOS-7-x86_64-Minimal-1511 |
192.168.48.14 | node4.xzsyr.com | CentOS-7-x86_64-Minimal-1511 |
192.168.48.15 | node5.xzsyr.com | CentOS-7-x86_64-Minimal-1511 |
1.2 叢集節點安裝JDK
IP | HOSTNAME | JDK | 版本說明 |
---|---|---|---|
192.168.48.13 | node3.xzsyr.com | √ | java version “1.8.0_131” |
192.168.48.14 | node4.xzsyr.com | √ | java version “1.8.0_131” |
192.168.48.15 | node5.xzsyr.com | √ | java version “1.8.0_131” |
jdk安裝配置,請參考Centos7 安裝配置JDK1.8
1.2 zookeeper部署叢集環境
kafka依賴zookeeper,建議自己搭建的zk叢集環境(雖然kafka預設自帶自己的zk元件)
IP | HOSTNAME | Zookeeper | IsAble |
---|---|---|---|
192.168.48.13 | node3.xzsyr.com | √ | YES |
192.168.48.14 | node4.xzsyr.com | √ | YES |
192.168.48.15 | node5.xzsyr.com | √ | YES |
zk叢集環境搭建,請參考Centos7 zookeeper-3.4.12叢集搭建
1.3kafka搭建部署并啟動
1.3.1下載下傳kafka安裝包
通路網址:http://kafka.apache.org/
左側導航欄最下面有個Download按鈕,點進去
進入網址:http://kafka.apache.org/downloads
1.3.2拷貝到centos系統目錄下
mkdir -p /opt/module #建立module檔案夾,用于安裝元件
mkdir -p /opt/software #建立sofware檔案夾,用于存放安裝包
将kafka_2.12-1.1.0.tgz上傳導入/opt/software目錄下
推薦工具:WinSCP、xftp
1.3.2解壓到指定目錄
1、切換目錄到/opt/sofwware
cd /opt/software
2、解壓到/opt/module目錄下
1.3.3配置kafka
1、切換目錄到kafka配置檔案
2、編輯server.properties配置資訊
指令:
vim server.properties
#修改以下配置
#為友善,直接将broker.id設定為了ip的最後一段,當叢集中有多個Kafka時,他們的這個值必須不一樣
broker.id=
#端口暫時不變
port=
#IP修改為本機的IP
host.name=node3.xzsyr.com
#可選配置項,将日志輸出到指定的位置
log.dirs=/tmp/kafka-logs
#必須配置自己的zookeeper
zookeeper.connect=node3.xzsyr.com:,node4.xzsyr.com:,node5.xzsyr.com:
#在配置叢集的時候,必須設定
listeners = PLAINTEXT://node3.xzsyr.com:
說明:如果是單機版的話,預設即可,我們什麼都不需要改動。現在我們是要配置叢集,是以需要配置一些參數
1、broker.id:每台機器不能一樣
2、zookeeper.connect:因為我有3台zookeeper伺服器,是以在這裡zookeeper.connect設定為3台,必須全部加進去
3、listeners:在配置叢集的時候,必須設定,不然以後的操作會報找不到leader的錯誤
3、拷貝kafka到另外兩台伺服器并修複配置
1、copy到node4、node5節點下
[root@node3 module]# scp -R kafka_2.12-1.1.0/ [email protected]:/opt/module/
[root@node3 module]# scp -R kafka_2.12-1.1.0/ [email protected]:/opt/module/
2、在node4節點下修改server.properties檔案
修改broker.id和listeners
#ip的最後一段
broker.id=
#在配置叢集的時候,必須設定
listeners = PLAINTEXT://node5.xzsyr.com:
3、在node5節點下修改server.properties
#ip的最後一段
broker.id=
#在配置叢集的時候,必須設定
listeners = PLAINTEXT://node5.xzsyr.com:
2、開啟相關端口
該操作在開啟防火牆,情況下使用
三台機器都要開啟,kafka通信預設是通過9092端口,也就是我們上面配的listeners
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
3、啟動zookeeper
三台都要啟動
4、啟動kafka
三台都要啟動
cd opt/module/kafka_2-
./bin/kafka-server-start.sh -daemon ./config/server.properties
jps指令檢查是否啟動成功
5、操作kafka
5.1建立topic
cd /opt/module/kafka_2.12-1.1.0/bin
bin/kafka-topics.sh --create --zookeeper node3.xzsyr.com:2181 --replication-factor 1 --partitions 1 --topic test
如果成功的話,會輸出:Created topic “test”.
指令說明:
zookeeper:為zk伺服器位址,已逗号分割配置多個
replication-factor:分區leader副本數,1代表沒有副本即分區本身,建議為2
partitions:分區數
topic:topic名稱
5.2檢視topic
cd /opt/module/kafka_2-/
#檢視所有topic
bin/kafka-topics.sh --list --zookeeper node3.xzsyr.com:
#檢視test topic資訊
bin/kafka-topics.sh --describe --zookeeper localhost: --topic test
topic描述說明:
leader:負責處理消息的讀和寫,leader是從所有節點中随機選擇的.
Replicas:列出了所有的副本節點,不管節點是否在服務中.
Lsr:是正在服務中的節點.
5.3檢視group偏移量
cd /opt/module/kafka_2-/bin
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper node3.xzsyr.com: --group Warning --topic test
5.4釋出消息
bin/kafka-console-producer.sh --broker-list node3.xzsyr.com: --topic test
5.5消費消息
bin/kafka-console-consumer.sh --bootstrap-server node3.xzsyr.com: --topic test --from-beginning --zookeeper node3.xzsyr.com:
指令配置項說明:
from-beginning:每次從頭開始消費