1、kakfa叢集部署規劃
這裡我采用三台伺服器來建構kakfa叢集,由于kafka叢集依賴zookeeper,是以在部署kafka之前,需要部署好zookeeper叢集,zookeeper部署在hadoop課程中已經做過介紹,這裡就不過多說明了,這裡我将kafka和zookeeper部署在了一起,kakfa叢集節點作業系統仍然采用centos7.7版本,各個主機角色和軟體版本如下表所示:
IP位址 | 主機名 | 安裝軟體 | 軟體版本 |
172.16.213.31 | kafkazk1 | Kafka+ ZooKeeper | kafka_2.11-2.4.1、zookeeper-3.5.8 |
172.16.213.41 | kafkazk2 | Kafka+ ZooKeeper | kafka_2.11-2.4.1、zookeeper-3.5.8 |
172.16.213.70 | kafkazk3 | Kafka+ ZooKeeper | kafka_2.11-2.4.1、zookeeper-3.5.8 |
這裡需要注意kafka和zookeeper的版本,預設kafka2.11版本自帶的zookeeper依賴jar包版本為3.5.7,是以zookeeper的版本至少在zookeeper3.5.7以及以上。
2、下載下傳與安裝Kafka
Kafka也需要安裝Java運作環境,這在前面課程已經介紹過了,這裡不再介紹,大家可以從kafka官網https://kafka.apache.org/downloads 擷取kafka安裝包,這裡推薦的版本是kafka_2.11-2.4.1.tgz。将下載下傳下來的安裝包直接解壓到一個路徑下即可完成kafka的安裝,這裡統一将kafka安裝到/usr/local目錄下,我以在kakfazk1主機為例,基本操作過程如下:
[root@kafkazk1~]# tar -zxvf kafka_2.11-2.4.1.tgz -C /usr/local
[root@kafkazk1~]# mv /usr/local/kafka_2.11-2.4.1 /usr/local/kafka
這裡我們将kafka安裝到了/usr/local目錄下。這裡我建立了一個kafka使用者,用來管理和維護kakfa叢集,後面所有對kafka的操作都通過此使用者來完成,執行如下操作進行建立使用者和授權:
[root@kafkazk1~]# useradd kafka
[root@kafkazk1~]# chown -R kafka:kafka /usr/local/kafka
在kafkazk1節點安裝完成kafka後,先不着急拷貝安裝程式到其它兩個節點,因為還沒有配置kakfa,等kafka配置完成,再統一打包拷貝到其它兩個節點。
3、配置kafka叢集
Kafka安裝完成後,接着進入配置階段,這裡我們将kafka安裝到/usr/local目錄下,是以,kafka的主配置檔案為/usr/local/kafka/config/server.properties,這裡以節點kafkazk1為例,重點介紹一些常用配置項的含義:
broker.id=1
listeners=PLAINTEXT://172.16.213.31:9092
log.dirs=/usr/local/kafka/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=172.16.213.31:2181,172.16.213.41:2181,172.16.213.70:2181
auto.create.topics.enable=true
delete.topic.enable=true
其中,每個配置項含義如下:
Ø broker.id:每一個broker在叢集中的唯一表示,要求是正數。每個節點不能相同,這裡我有三個節點,分别用1、2、3表示,當該伺服器的IP位址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況。
Ø listeners:設定kafka的監聽位址與端口,可以将監聽位址設定為主機名或IP位址,這裡将監聽位址設定為IP位址。如果設定為主機名,那麼還需要将主機名與IP的對應關系本地解析到系統的/etc/hosts檔案中。
Ø log.dirs:這個參數用于配置kafka儲存資料的位置,kafka中所有的消息都會存在這個目錄下。可以通過逗号來指定多個路徑,kafka會根據最少被使用的原則選擇目錄配置設定新的parition。需要注意的是,kafka在配置設定parition的時候選擇的規則不是按照磁盤的空間大小來定的,而是根據配置設定的parition的個數多小而定。
Ø num.partitions:這個參數用于設定新建立的topic有多少個分區,可以根據消費者實際情況配置,配置過小會影響消費性能。這裡配置6個。
Ø log.retention.hours:這個參數用于配置kafka中消息儲存的時間,還支援log.retention.minutes和log.retention.ms配置項。這三個參數都會控制删除過期資料的時間,推薦還是使用log.retention.ms。如果多個同時設定,那麼會選擇最小的那個。
Ø log.segment.bytes:配置partition中每個segment資料檔案的大小,預設是1GB,超過這個大小會自動建立一個新的segment file。
Ø zookeeper.connect:這個參數用于指定zookeeper所在的位址,它存儲了broker的元資訊。這個值可以通過逗号設定多個值,每個值的格式均為:hostname:port/path,其中每個部分的含義如下:
hostname:表示zookeeper伺服器的主機名或者IP位址,這裡設定為IP位址。
port: 表示是zookeeper伺服器監聽連接配接的端口号。
/path:表示kafka在zookeeper上的根目錄。如果不設定,會使用根目錄。
Ø auto.create.topics.enable:這個參數用于設定是否自動建立topic,如果請求一個topic時發現還沒有建立,kafka會在broker上自動建立一個topic,如果需要嚴格的控制topic的建立,那麼可以設定auto.create.topics.enable為false,禁止自動建立topic。
Ø delete.topic.enable:在0.8.2版本之後,Kafka提供了删除topic的功能,但是預設并不會直接将topic資料實體删除。如果要從實體上删除(即删除topic後,資料檔案也會一同删除),就需要設定此配置項為true。
Kakfa配置檔案修改完成後,接着打包kakfa安裝程式,将程式拷貝到其它兩個節點,然後進行解壓即可,注意,在其它兩個節點上,broker.id務必要修改,kafka叢集中broker.id不能有相同的。
4、啟動kafka叢集
三個節點的kafka配置完成後,就可以啟動kafka了,但在啟動kafka叢集前,需要確定ZooKeeper叢集已經正常啟動。接着,依次在kafka各個節點上執行如下指令即可:
[root@kafkazk1~]# cd /usr/local/kafka
[root@kafkazk1 kafka]# nohup bin/kafka-server-start.sh config/server.properties &
[root@kafkazk1 kafka]# jps
21840 Kafka
15593 Jps
15789 QuorumPeerMain
這裡将kafka放到背景運作,啟動後,會在啟動kafka的目前目錄下生成一個nohup.out檔案,可通過此檔案檢視kafka的啟動和運作狀态。通過jps指令,可以看到有個Kafka辨別,這是kafka程序成功啟動的标志。
三、kafka叢集基本指令操作
kafka提供了多個指令用于檢視、建立、修改、删除topic資訊,也可以通過指令測試如何生産消息、消費消息等,這些指令位于kafka安裝目錄的bin目錄下,這裡是/usr/local/kafka/bin。登入任意一台kafka叢集節點,切換到此目錄下,即可進行指令操作,下面簡單列舉了kafka的一些常用指令的使用方法。
(1)顯示topic清單
指令執行方式如下所示:
[kafka@kafkazk1 bin]$ ./kafka-topics.sh --bootstrap-server 172.16.213.31:9092,172.16.213.41:9092,172.16.213.70:9092 --list
其中,“--bootstrap-server”參數後面跟的是kakfa叢集的主機清單和端口。具體操作執行個體如下圖所示:
(2)建立一個topic,并指定topic屬性(副本數、分區數等)
指令執行方式如下圖所示:
其中:
Ø --create:表示建立一個topic。
Ø --replication-factor:表示這個topic的副本數,這裡設定為1個。
Ø --partitions:指定topic的分區數,一般設定為小于或等于kafka叢集節點數即可。
Ø --topic:指定要建立的topic的名稱。
(3)檢視某個topic的狀态
指令執行方式如下圖所示:
這裡通過”--describe“選項檢視剛剛建立好的testtopic的狀态,其中:
Ø Partition:表示分區ID,通過輸出可以看到,testtopic有三個分區,一個副本,這剛好和我們建立testtopic時指定的配置吻合。
Ø Leader:表示目前負責讀寫的Leader broker。
Ø Replicas:表示目前分區的所有副本對應的broker清單。
Ø Isr:表示處于活動狀态的broker。
(4)生産消息
指令執行方式如下圖所示:
這裡需要注意,”--broker-list“後面跟的内容是kafka叢集的ip和端口,當輸入這條指令後,光标處于可寫狀态,接着就可以寫入一些測試資料,每行一條,這裡輸入的内容如上圖紅框所示。
(5)消費消息
在生産消息的同時,再登入任意一台kafka叢集節點,執行消費消息的指令,結果如下圖所示:
可以看到,在第四步中,輸入的消息在這裡原樣輸出了,這樣就完成了消息的消費。
(6)删除topic
指令執行方式如下圖所示:
注意這裡的“--delete”選項,用來删除一個指定的topic。然後通過“--list”檢視發現這個topic确實已經被删除了。
本文主要講解了kafka的概念、術語以及kafka分布式叢集的建構,最後介紹了kafka的基本操作指令,要數量使用kafka,重要的是了解其内部實作機制以及運作原理。Kafka作為一個消息中間件,在叢集架構環境中經常使用,特别是大資料架構環境中,是以,對于本文介紹的kafka知識點要求熟練掌握。