天天看點

安裝和測試Kafka下載下傳啟動和停止單 broker 測試多 broker 測試将日志推送到 kafka

本文主要介紹如何在單節點上安裝 kafka 并測試 broker、producer 和 consumer 功能。

解壓并進入目錄:

檢視目錄結構:

運作 kafka ,需要依賴 zookeeper,你可以使用已有的 zookeeper 叢集或者利用 kafka 提供的腳本啟動一個 zookeeper 執行個體:

預設的,zookeeper 會監聽在 <code>*:2181/tcp</code>。

停止剛才啟動的 zookeeper 執行個體:

啟動kafka server:

config/server.properties 中有一些預設的配置參數,這裡僅僅列出參數,不做解釋:

如果你像我一樣是在虛拟機中測試 kafka,那麼你需要修改 kafka 啟動參數中 jvm 記憶體大小。檢視 kafka-server-start.sh 腳本,修改 <code>kafka_heap_opts</code> 處 <code>-xmx</code> 和<code>-xms</code> 的值。

啟動成功之後,會看到如下日志:

從日志可以看到:

log flusher 有一個預設的周期值

kafka server 監聽在9092端口

在 cdh1:9092 上注冊了一個 broker 0 ,路徑為 /brokers/ids/0

停止 kafka server :

在啟動 kafka-server 之後啟動,運作producer:

在另一個終端運作 consumer:

在 producer 端輸入字元串并回車,檢視 consumer 端是否顯示。

建立第一個 broker:

編寫 config/server1.properties 并修改下面配置:

建立第二個 broker:

編寫 config/server2.properties 并修改下面配置:

建立第三個 broker:

編寫 config/server3.properties 并修改下面配置:

接下來分别啟動這三個 broker:

下面是三個 broker 監聽的網絡接口和端口清單:

在 kafka 0.8 中有兩種方式建立一個新的 topic:

在 broker 上開啟 <code>auto.create.topics.enable</code> 參數,當 broker 接收到一個新的 topic 上的消息時候,會通過 <code>num.partitions</code> 和 <code>default.replication.factor</code>兩個參數自動建立 topic。

使用 <code>bin/kafka-topics.sh</code> 指令

建立一個名稱為 <code>zerg.hydra</code> 的 topic:

使用下面檢視建立的 topic:

還可以檢視更詳細的資訊:

預設的,kafka 持久化 topic 到 <code>log.dir</code> 參數定義的目錄。

以 <code>sync</code> 模式啟動一個 producer:

然後,輸入以下内容:

在另一個終端運作:

注意,生産環境通常不會添加 <code>--from-beginning</code> 參數。

觀察輸出,你會看到下面内容:

把 consumer 停掉再啟動,你還會看到相同的輸出結果。

例如,将 apache 或者 nginx 或者 tomcat 等産生的日志 push 到 kafka,隻需要執行下面代碼即可: