天天看點

kafka&zookeeper

一、搭建Zookeeper叢集

Zookeeper是一個分布式開源架構,提供了協調分布式應用的基本服務,它向外部應用暴露一組通用服務——分布式同步(Distributed Synchronization)、命名服務(Naming Service)、叢集維護(Group Maintenance)等,簡化分布式應用協調及其管理的難度,提供高性能的分布式服務。ZooKeeper本身可以以單機模式安裝運作,不過它的長處在于通過分布式ZooKeeper叢集(一個Leader,多個Follower),基于一定的政策來保證ZooKeeper叢集的穩定性和可用性,進而實作分布式應用的可靠性。

1.在zookeeper.apache.org上下載下傳zookeeper-3.4.8.tar.gz

2.解壓 tar -xzvf zookeeper-3.4.8.tar.gz

3.修改權限 sudo chown -R cms(ubuntu使用者名) zookeeper-3.4.8

4.修改配置檔案 /etc/profile,增加

kafka&zookeeper

5.對Zookeeper的配置檔案的參數進行設定

進入zookeeper-3.4.5/conf

1)cp zoo_sample.cfg zoo.cfg

2)在zookeeper下建立一個存放資料的目錄

mkdir zookerperdata

3)vim zoo.cfg

kafka&zookeeper

4)注意上圖的配置中master,slave1分别為主機名

在上面的配置檔案中"server.id=host:port:port"中的第一個port是從機器(follower)連接配接到主機器(leader)的端口号,第二個port是進行leadership選舉的端口号。

5)建立myid

接下來在dataDir所指定的目錄下(zookeeper-3.4.8/zookerperdata/)建立一個檔案名為myid的檔案,檔案中的内容隻有一行,為本主機對應的id值,也就是上圖中server.id中的id。例如:在伺服器1中的myid的内容應該寫入1。

vim myid

6)遠端複制到slave1,slave2相同的目錄下

scp -r zookeeper-3.4.8 cms@slave1:/home/cms/

7)修改slave1,slave2機器上的myid的值分别為2和3

啟動ZooKeeper叢集

在ZooKeeper叢集的每個結點上,執行啟動ZooKeeper服務的腳本,如下所示:

kafka&zookeeper
kafka&zookeeper
kafka&zookeeper

其中,QuorumPeerMain是zookeeper程序,啟動正常。

如上依次啟動了所有機器上的Zookeeper之後可以通過ZooKeeper的腳本來檢視啟動狀态,包括叢集中各個結點的角色(或是Leader,或是Follower),如下所示,是在ZooKeeper叢集中的每個結點上查詢的結果

二、搭建kafka叢集

kafka&zookeeper

1.下載下傳

下載下傳官網:http://kafka.apache.org/downloads

下載下傳版本:與自己安裝的Scala版本對應的版本,個人習慣是下載下傳最新版本的前一版

kafka_2.11-0.10.0.1.tgz

2.安裝

tar -xzf kafka_2.11-0.10.0.1.tgz

cp kafka_2.11-0.10.0.1.tgz /home/cms/kafka

3.配置環境變量

即path、classpath,意義不大,可不配置

4.修改配置檔案kafka/config/server.properties

kafka&zookeeper

5.在kafka的目錄下,建立kafka存儲資料的目錄

mkdir kafkalogs

6.其他節點配置

将kafka檔案複制到其他節點

kafka&zookeeper

broker.id=1 #整個叢集内唯一id号,整數,一般從0開始

listeners=PLAINTEXT://192.168.31.132:9092 #協定、目前broker機器ip、端口

port=9092 #broker端口

host.name=192.168.31.132 #broker 機器ip

7.每個節點下啟動zookerper

8.啟動kafka程序,在每個節點的kafka/bin目錄下

--zookeeper : zookeeper叢集清單,用英文逗号分隔。可以不用指定zookeeper整個叢集内的節點清單,隻指定某個或某幾個zookeeper節點清單也是你可以的

replication-factor : 複制數目,提供failover機制;1代表隻在一個broker上有資料記錄,一般值都大于1,代表一份資料會自動同步到其他的多個broker,防止某個broker當機後資料丢失。

partitions : 一個topic可以被切分成多個partitions,一個消費者可以消費多個partitions,但一個partitions隻能被一個消費者消費,是以增加partitions可以增加消費者的吞吐量。kafka隻保證一個partitions内的消息是有序的,多個一個partitions之間的資料是無序的。

9.啟動生産者和消費者

生産者:kafka-console-producer.sh --broker-list 192.168.31.131:9092 --topic test5

--broker-list : 值可以為broker叢集中的一個或多個節點

消費者:

kafka-console-consumer.sh --zookeeper 192.168.31.131:2181,192.168.31.132:2181,192.168.31.133:2181 --topic test5 --from-beginning

--zookeeper : 值可以為zookeeper叢集中的一個或多個節點

--from-beginning 表示從開始第一個消息開始接收

10.檢視topic

kafka-topics.sh --list --zookeeper 192.168.31.131:2181,192.168.31.132:2181,192.168.31.133:2181

11.檢視topic詳情

kafka-topics.sh --describe --zookeeper 192.168.31.131:2181,192.168.31.132:2181,192.168.31.133:2181 --topic test5

kafka&zookeeper

狀态說明:test有三個分區分别為1、2、3,分區0的leader是3(broker.id),分區0有三個副本,并且狀态都為lsr(ln-sync,表示可以參加選舉成為leader)。

12.建立分區為3、備份為3的topic

bin/kafka-topics.sh --create --zookeeper 192.168.31.131:2181,192.168.31.132:2182,192.168.31.133:2183 --replication-factor 3 --partitions 3 --topic test5

13.删除topic

在config/server.properties中加入delete.topic.enable=true并重新開機服務,在執行如下指令

kafka-topics.sh --delete --zookeeper 192.168.31.131:2181,192.168.31.132:2181,192.168.31.133:2181 --topic test5