kafka安裝
下載下傳路徑:kafka下載下傳連結
解壓
解壓
tar -xzf kafka_2.11-0.11.0.0.tgz
并進入目錄
cd kafka_2.11-0.11.0.0
啟動服務
首先啟動zookeeper,之後啟動kafka的服務。您可以使用随kafka一起打包的便捷腳本來擷取一個快速而簡單的單節點ZooKeeper執行個體。
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
建立topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
這個localhost:2181是zookeeper的位址,是以如果我們如果安裝了zookeeper在其他的伺服器,則修改config/zookeeper.properties的配置檔案和config/server.properties的配置資訊
例如zookeeper.properties的配置檔案
![](/image/images/1499257795.485127.png)
server.properties的配置
資訊
檢視目前的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
,我們看到會有一個test
當然我們也可以配置,在沒有該topic的時候去手動建立。
發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Kafka附帶一個指令行用戶端,它将從檔案或标準輸入中擷取輸入,并将其作為消息發送到Kafka群集。 預設情況下,每行将作為單獨的消息發送
開啟consumer
這裡我們開啟consumer并且建立消息,把他讀出來
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
會看到如下消息
This is a message
This is another message
如果你已經在其他的位置發送過該topic的消息,我們可以傳回到上邊的步驟,發送一些新的消息,看看這裡能不能顯示
建立多節點
有時候,為了新能,一台奇迹可能不能滿足我們的需求,這個時候我們需要啟動多個服務,這個時序,就需要多啟動幾個kafka的節點,首先,我們需要把broker的配置檔案多複制幾分。
cp config/server.properties config/server-properties
cp config/server.properties config/server-properties
編輯配置檔案
config/server-.properties:
broker.id=
listeners=PLAINTEXT://:
log.dir=/tmp/kafka-logs-
config/server-.properties:
broker.id=
listeners=PLAINTEXT://:
log.dir=/tmp/kafka-logs-
broker.id是broker的唯一辨別符,辨別他在叢集中的位置,編輯端口和日志路徑是因為我們的服務都部署在一台機器上,如果是多台伺服器,完全不用修改端口和檔案路徑,之前我們已經啟動了我們的zookeeper和一個節點的kafka,現在我們把其他的兩個節點也打開。
bin/kafka-server-start.sh config/server-properties &
bin/kafka-server-start.sh config/server-properties &
現在我們建立一個複制因子是3的新的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好的,現在我們有一個叢集,我們怎麼知道哪個經紀人在做什麼呢?看到運作“describe topics”指令:
> bin/kafka-topics.sh --describe --zookeeper localhost: --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount: ReplicationFactor: Configs:
Topic: my-replicated-topic Partition: Leader: Replicas: ,, Isr: ,,
以下是輸出的說明。第一行給出了所有分區的摘要,每個附加行提供有關一個分區的資訊。因為這個主題隻有一個分區,隻有一行。
- leader”是負責給定分區的所有讀取和寫入的節點。每個節點将成為随機選擇的分區部分的引導者
- “replicas”是複制此分區的日志的節點清單,無論它們是領先者還是現在還存在
- isr”是一組“同步”副本。這是副本清單的子集,該清單目前正在生效并被上司者所追蹤
如果我們殺掉一個程序會有什麼效果呢,我們把node殺掉
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
不過消息仍然可以消費,即使最初采取寫作的leader也是如此:
> bin/kafka-console-consumer.sh --bootstrap-server localhost: --from-beginning --topic my-replicated-topic
...
my test message
my test message
小結
kafka的安裝到這裡就結束了,當然我們開發中發送消息不會用指令行發送,但是卻可以很好的測試出我們的安裝是否是正确的,當然這些在官網都可以看到,有興趣的同學可以直接官網看了。