本文主要介紹如何在單節點上安裝 kafka 并測試 broker、producer 和 consumer 功能。
解壓并進入目錄:
檢視目錄結構:
運作 kafka ,需要依賴 zookeeper,你可以使用已有的 zookeeper 叢集或者利用 kafka 提供的腳本啟動一個 zookeeper 執行個體:
預設的,zookeeper 會監聽在 <code>*:2181/tcp</code>。
停止剛才啟動的 zookeeper 執行個體:
啟動kafka server:
config/server.properties 中有一些預設的配置參數,這裡僅僅列出參數,不做解釋:
如果你像我一樣是在虛拟機中測試 kafka,那麼你需要修改 kafka 啟動參數中 jvm 記憶體大小。檢視 kafka-server-start.sh 腳本,修改 <code>kafka_heap_opts</code> 處 <code>-xmx</code> 和<code>-xms</code> 的值。
啟動成功之後,會看到如下日志:
從日志可以看到:
log flusher 有一個預設的周期值
kafka server 監聽在9092端口
在 cdh1:9092 上注冊了一個 broker 0 ,路徑為 /brokers/ids/0
停止 kafka server :
在啟動 kafka-server 之後啟動,運作producer:
在另一個終端運作 consumer:
在 producer 端輸入字元串并回車,檢視 consumer 端是否顯示。
建立第一個 broker:
編寫 config/server1.properties 并修改下面配置:
建立第二個 broker:
編寫 config/server2.properties 并修改下面配置:
建立第三個 broker:
編寫 config/server3.properties 并修改下面配置:
接下來分别啟動這三個 broker:
下面是三個 broker 監聽的網絡接口和端口清單:
在 kafka 0.8 中有兩種方式建立一個新的 topic:
在 broker 上開啟 <code>auto.create.topics.enable</code> 參數,當 broker 接收到一個新的 topic 上的消息時候,會通過 <code>num.partitions</code> 和 <code>default.replication.factor</code>兩個參數自動建立 topic。
使用 <code>bin/kafka-topics.sh</code> 指令
建立一個名稱為 <code>zerg.hydra</code> 的 topic:
使用下面檢視建立的 topic:
還可以檢視更詳細的資訊:
預設的,kafka 持久化 topic 到 <code>log.dir</code> 參數定義的目錄。
以 <code>sync</code> 模式啟動一個 producer:
然後,輸入以下内容:
在另一個終端運作:
注意,生産環境通常不會添加 <code>--from-beginning</code> 參數。
觀察輸出,你會看到下面内容:
把 consumer 停掉再啟動,你還會看到相同的輸出結果。
例如,将 apache 或者 nginx 或者 tomcat 等産生的日志 push 到 kafka,隻需要執行下面代碼即可: