文章目錄
- 安裝
-
- 安裝JDK
- 安裝zookeeper
- 安裝kafka
- 運作kafka
-
- 建立topic
- 建立生産者
- 建立消費者
- NET簡單操作kafka
-
- 服務端
- 用戶端
安裝
windows伺服器上安裝JDK,kafka以及zookeeper。
安裝JDK
JDK下載下傳路徑
安裝完成後需要添加以下的環境變量(右鍵點選“我的電腦” -> “進階系統設定” -> “環境變量” ):
C:\Program Files\Java\jdk-14\bin(你的安裝目錄)
打開cmd運作 “java -version” 檢視目前系統Java的版本,測試是否安裝完成。
安裝zookeeper
Kafka的運作依賴于Zookeeper,是以在運作Kafka之前我們需要安裝并運作Zookeeper
下載下傳zookeeper
主要是下載下傳Bin檔案的版本,apache-zookeeper-3.6.0-bin.tar.gz
下載下傳完成解壓檔案
打開zookeeper-3.4.13\conf,把zoo_sample.cfg重命名成zoo.cfg
從文本編輯器裡打開zoo.cfg
把dataDir的值改成“./zookeeper-3.4.13/data”
添加環境變量:
C:\Program Files\apache-zookeeper-3.6.0\bin(你的解壓到的目錄)
進入目錄C:\Program Files\apache-zookeeper-3.6.0\bin,文本編輯zkEnv.cmd,添加JAVA_HOME變量後儲存退出。
set JAVA_HOME=C:\Program Files\Java\jdk-14(前面安裝JDK的目錄)
打開cmd然後執行 zkserver,挂着不要關閉。
安裝kafka
下載下傳kafka
解壓檔案,注意解壓的目錄不能含有中文和空格
進入kafka的目錄config
從文本編輯器裡打開 server.properties
把 log.dirs的值改成 “./logs”
打開cmd進入kafka檔案目錄: cd C:\kafka_2.11-2.4.1(kafka目錄)
輸入并執行: .\bin\windows\kafka-server-start.bat .\config\server.properties
挂着不要關閉。
運作kafka
建立topic
cmd進入kafka目錄,執行以下代碼建立一個topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
建立生産者
cmd進入kafka目錄,執行以下代碼建立一個producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test
建立消費者
cmd進入kafka目錄,執行以下代碼建立一個consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
最後測試如圖所示
NET簡單操作kafka
建立兩個項目,一個kafka服務端用于發送消息,一個kafka用戶端用于訂閱消息
打開NuGet,搜尋安裝kafka-net-core
服務端
using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;
namespace KafkaServer
{
class Program
{
static void Main(string[] args)
{
const string topicName = "test";
var options = new KafkaOptions(new Uri("http://localhost:9092"));
//建立一個生産者發消息
using (var producer = new Producer(new BrokerRouter(options)){ BatchSize = 100, BatchDelayTime = TimeSpan.FromMilliseconds(2000) })
{
while (true)
{
var message = Console.ReadLine();
if (message == "quit") break;
if (!string.IsNullOrEmpty(message))
{
producer.SendMessageAsync(topicName, new[] { new Message(message) });
}
}
}
}
}
}
用戶端
using KafkaNet;
using KafkaNet.Common;
using KafkaNet.Model;
using System;
using System.Threading.Tasks;
namespace KafkaClient
{
class Program
{
static void Main(string[] args)
{
const string topicName = "test";
var options = new KafkaOptions(new Uri("http://localhost:9092"));
Task.Run(() =>
{
//建立一個消費者
var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)));
foreach (var data in consumer.Consume())
{
Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
}
});
Console.ReadLine();
}
}
}
運作測試結果: