天天看點

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

文章目錄

  • 安裝
    • 安裝JDK
    • 安裝zookeeper
    • 安裝kafka
  • 運作kafka
    • 建立topic
    • 建立生産者
    • 建立消費者
  • NET簡單操作kafka
    • 服務端
    • 用戶端

安裝

windows伺服器上安裝JDK,kafka以及zookeeper。

安裝JDK

JDK下載下傳路徑

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

安裝完成後需要添加以下的環境變量(右鍵點選“我的電腦” -> “進階系統設定” -> “環境變量” ):

C:\Program Files\Java\jdk-14\bin(你的安裝目錄)

打開cmd運作 “java -version” 檢視目前系統Java的版本,測試是否安裝完成。

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

安裝zookeeper

Kafka的運作依賴于Zookeeper,是以在運作Kafka之前我們需要安裝并運作Zookeeper

下載下傳zookeeper

主要是下載下傳Bin檔案的版本,apache-zookeeper-3.6.0-bin.tar.gz

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

下載下傳完成解壓檔案

打開zookeeper-3.4.13\conf,把zoo_sample.cfg重命名成zoo.cfg

從文本編輯器裡打開zoo.cfg

把dataDir的值改成“./zookeeper-3.4.13/data”

添加環境變量:

C:\Program Files\apache-zookeeper-3.6.0\bin(你的解壓到的目錄)

進入目錄C:\Program Files\apache-zookeeper-3.6.0\bin,文本編輯zkEnv.cmd,添加JAVA_HOME變量後儲存退出。

set JAVA_HOME=C:\Program Files\Java\jdk-14(前面安裝JDK的目錄)

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

打開cmd然後執行 zkserver,挂着不要關閉。

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

安裝kafka

下載下傳kafka

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

解壓檔案,注意解壓的目錄不能含有中文和空格

進入kafka的目錄config

從文本編輯器裡打開 server.properties

把 log.dirs的值改成 “./logs”

打開cmd進入kafka檔案目錄: cd C:\kafka_2.11-2.4.1(kafka目錄)

輸入并執行: .\bin\windows\kafka-server-start.bat .\config\server.properties

挂着不要關閉。

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

運作kafka

建立topic

cmd進入kafka目錄,執行以下代碼建立一個topic:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
           
基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

建立生産者

cmd進入kafka目錄,執行以下代碼建立一個producer

kafka-console-producer.bat --broker-list localhost:9092 --topic test
           

建立消費者

cmd進入kafka目錄,執行以下代碼建立一個consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
           

最後測試如圖所示

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

NET簡單操作kafka

建立兩個項目,一個kafka服務端用于發送消息,一個kafka用戶端用于訂閱消息

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka

打開NuGet,搜尋安裝kafka-net-core

服務端

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;

namespace KafkaServer
{
    class Program
    {
        static void Main(string[] args)
        {
            const string topicName = "test";
            var options = new KafkaOptions(new Uri("http://localhost:9092"));

            //建立一個生産者發消息
            using (var producer = new Producer(new BrokerRouter(options)){  BatchSize = 100, BatchDelayTime = TimeSpan.FromMilliseconds(2000) })
            {
                while (true)
                {
                    var message = Console.ReadLine();
                    if (message == "quit") break;

                    if (!string.IsNullOrEmpty(message))
                    {
                        producer.SendMessageAsync(topicName, new[] { new Message(message) });
                    }
                }
            }
        }
    }
}
           

用戶端

using KafkaNet;
using KafkaNet.Common;
using KafkaNet.Model;
using System;
using System.Threading.Tasks;

namespace KafkaClient
{
    class Program
    {
        static void Main(string[] args)
        {
            const string topicName = "test";
            var options = new KafkaOptions(new Uri("http://localhost:9092"));


            Task.Run(() =>
            {
                //建立一個消費者
                var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)));
                foreach (var data in consumer.Consume())
                {
                    Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
                }
            });

            Console.ReadLine();
        }
    }
}
           

運作測試結果:

基于NET Core簡單操作Kafka安裝運作kafkaNET簡單操作kafka