天天看點

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

前言:基于Windows系統下的Kafka環境搭建;以及使用.NET 6環境進行開發簡單的生産者與消費者的示範。

一、環境部署

Kafka是使用Java語言和Scala語言開發的,是以需要有對應的Java環境,以及Scala語言環境。

Java環境配置,如果不清楚的,可以檢視鄙人的另一篇部落格:

https://www.cnblogs.com/weskynet/p/14852471.html

1、Scala環境安裝,需要先下載下傳Scala語言包,下載下傳位址:

https://www.scala-lang.org/download/scala2.html

要選擇Binaries版本的環境,否則需要自己編譯:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

2、Kafka基于Zookeeper環境運作,zookeeper提供給kafka一系列的功能支援,是以還需要安裝Zookeeper有關的環境。下載下傳zookeeper位址:

https://zookeeper.apache.org/releases.html#download

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
3、同樣,Zookeeper也需要下載下傳帶bin 的連結,沒有帶bin的連結,可能是源碼,需要自己編譯: 
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

4、接下來是下載下傳主角,Kafka了。下載下傳位址:

https://kafka.apache.org/downloads.html

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
5、同樣需要選擇下載下傳binary版本,然後根據scala的版本選擇對應的版本。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
6、下載下傳的三個安裝包,如圖所示:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
7、先安裝Scala語言包環境:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

8、驗證Scala語言包是否安裝成功:

控制台視窗,輸入:scala -version

如果提示類似如下有關版本資訊,則代表安裝成功。

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

9、然後是安裝zookeeper環境。必須先啟動zookeeper,才可以使用kafka。

安裝zookeeper環境,先解壓下載下傳的包,然後在解壓後的目錄下新增data檔案夾

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
10、然後複制data檔案夾的絕對路徑,備用。在conf檔案夾下,編輯cfg檔案
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
11、在cfg檔案内,修改dataDir指定為上面建立的data檔案夾的絕對路徑。注意路徑是斜杠/,如果要使用 \ 反斜杆,需要寫雙反斜杠 \\
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
12、也要更改cfg格式的檔案名稱為 zoo.cfg 否則zookeeper無法識别配置檔案。Zoo.cfg檔案是zookeeper啟動時候自動關聯的預設配置檔案名稱。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
13、然後建立環境變量 ZOOKEEPER_HOME:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
14、環境變量path新增:%ZOOKEEPER_HOME%\bin
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
15、啟動zookeeper,直接任意打開控制台,輸入 zkServer
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
16、如果都沒有報錯,一般是啟動成功了的。再次驗證下,可以任意開個控制台,輸入JPS進行檢視,如下圖所示,有JPS、也有QuorumPeerMain,代表zookeeper啟動成功了。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
17、Kafka環境安裝。先解壓,然後在解壓後的目錄下,新增logs檔案夾
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
18、然後在Config檔案夾下,修改 server.properties 檔案,修改 log.dirs 的值為 新增的logs檔案夾的絕對路徑
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
19、進入到解壓後的kafka目錄下,在路徑欄輸入cmd,快速打開目前檔案夾下的控制台視窗:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

20、輸入指令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

進行啟動Kafka服務:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
21、啟動Kafka報錯了,可能是版本問題,kafka一般新版本對windows環境不友好,是以降級一下。此處我把kafka3.0降級為2.8:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
22、此處我下載下傳的版本為 2.13-2.8.1,各位大佬們可以按照自己意願選擇版本。可能2.x版本和3.x版本跨度比較大,是以3.0版本沒法玩。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
23、然後是重複以上配置kafka有關的動作,修改有關配置檔案以及新增logs檔案夾等。此處省略。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
24、接着在低版本的kafka目錄下,快速進入目前解壓縮的目錄下,再次輸入有關指令嘗試一下:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
25、沒有提示錯誤,根據提示資訊,代表是啟動成功了。任意打開控制台,再輸入JPS檢視下,可以看到Kafka,确認是啟動OK了。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

26、然後是要一款Kafka可視化工具,此處我選擇使用offset explorer  (原來是叫kafka tools,如下載下傳位址所示),下載下傳位址:

https://www.kafkatool.com/download.html

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
27、安裝可視化工具,預設可以一直下一步:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
28、可以在安裝目錄下把可執行程式發送到桌面快捷方式,友善打開。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
29、一些配置,包括名稱、kafka版本、端口号、服務位址等
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
30、連接配接以後的效果圖,如下。Topic是空的,接下來寫點代碼。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

 二、代碼開發與測試

31、建立類庫項目,當作kafka服務類庫

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
32、此處選擇标準庫2.1,用于可以給多種.net core版本使用,友善相容。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
33、引用 Confluent.Kafka 包。
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
34、此處新增釋出服務類和訂閱服務類:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
35、新增的生産者釋出服務方法代碼如下:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
代碼:
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

/// <summary>
    /// Description: Kafka生産者釋出服務
    /// CreateTime: 2022/1/21 19:35:27
    /// Author: Wesky
    /// </summary>
    public class PublishService: IPublishService
    {
        public async Task PublishAsync<TMessage>(string broker, string topicName, TMessage message) where TMessage : class
        {
            var config = new ProducerConfig
            {
                BootstrapServers = broker, // kafka服務叢集,例如  "192.168.0.1:9092,192.168.0.2:9092"   或者單機 "192.168.0.1:9092"
                Acks = Acks.All,
                MessageSendMaxRetries = 3, // 發送失敗重試的次數
            };
            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                try
                {
                    string data = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                    var sendData = new Message<string, string> { Key = Guid.NewGuid().ToString("N"), Value = data};
                    var report =  await producer.ProduceAsync(topicName, sendData);
                    Console.WriteLine($"消息 >>>>>: {data} \r\n發送到:{report.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> ex)
                {
                    Console.WriteLine($"消息發送失敗>>>>>:\r\n Code= {ex.Error.Code} >>> \r\nError= {ex.Message}");
                }
            }
        }
    }      

View Code

36、新增的消費者接收服務方法代碼如下:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
/// <summary>
    /// Description: kafka 消費者訂閱服務
    /// CreateTime: 2022/1/21 19:36:25
    /// Author: Wesky
    /// </summary>
    public class SubscribeService: ISubscribeService
    {
     
        /// <summary>
        /// 消費者服務核心代碼
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="config">消費者配置資訊</param>
        /// <param name="topics">主題集合</param>
        /// <param name="func"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics,  Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class
        {
            const int commitPeriod = 1;
            using (var consumer = new ConsumerBuilder<Ignore, string>(config)
             .SetErrorHandler((_, e) =>
             {
                 Console.WriteLine($"消費錯誤 >>>>>: {e.Reason}");
             })
             .SetStatisticsHandler((_, json) =>
             {
                 Console.WriteLine($"************************************************");
             })
             .SetPartitionsAssignedHandler((c, partitionList) =>
             {
                 string partitions = string.Join(", ", partitionList);
                 Console.WriteLine($"配置設定的分區 >>>>> : {partitions}");
             })
             .SetPartitionsRevokedHandler((c, partitionList) =>
             {
                 string partitions = string.Join(", ", partitionList);
                 Console.WriteLine($"回收的分區 >>>>> : {partitions}");
             })
             .Build())
            {
                consumer.Subscribe(topics);
                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);
                            if (consumeResult.IsPartitionEOF)
                            {
                                continue;
                            }
                            if(consumeResult?.Offset % commitPeriod == 0){
                                try
                                {
                                    var result = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message?.Value);
                                    func(result); // 消費消息
                                }
                                catch (Exception ex)
                                {
                                    Console.WriteLine($"消費業務處理失敗: {ex.Message}");
                                }
                                try
                                {
                                    consumer.Commit(consumeResult);  // 手動送出

                                    Console.WriteLine($"消費者消費完成,已送出 ");
                                }
                                catch (KafkaException e)
                                {
                                    Console.WriteLine($"送出錯誤 >>>>> : {e.Error.Reason}");
                                }
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"消費錯誤>>>>> : {e.Error.Reason}");
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine($"其他錯誤 >>>>> :{e.Message}");
                    consumer.Close();
                }
            }
            await Task.CompletedTask;
        }
    }      

37、并且提供對應的接口服務,用于開放給外部調用,或者提供依賴注入使用:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

38、建立一個控制台項目,用來當作消費者端的測試,并且新增一個方法,用來當作消費者接收到消息以後的業務處理方法體。此處控制台環境版本為.NET 6

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

39、消費用戶端代碼如下。其中,BootstrapServers也可以提供叢集位址,例如 ip1:port,ip2:port…… 服務之間以半形逗號隔開。

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

40、再新增一個webapi項目,用來當作生産者的用戶端進行發送資料。以及對kafka服務類部分進行依賴注入注冊,此處使用單例。該webapi此處使用.NET 6環境,帶有控制器的模式。

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

41、新增的控制器裡面,進行生産者的注入與實作。注意:topicName參數對應上邊的topic-wesky,通過主題綁定,否則消費者不認識就沒辦法消費到了。

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

控制器代碼:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範
[Route("api/[controller]/[action]")]
    [ApiController]
    public class ProducerController : ControllerBase
    {
        IPublishService _service = null;
        public ProducerController(IPublishService publishService)
        {
            _service = publishService;
        }
        [HttpPost]
        public IActionResult SendMessage(string broker,string topicName,string message)
        {
            _service.PublishAsync(broker, topicName, message);
            return Ok();
        }
    }      

42、接下來是一些測試,如圖所示:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

43、最後,使用可視化管理工具Offset進行檢視,可以看到對應的主題。選中主題,可以設定資料類型,這裡我設定為字元串,就可以檢視到對應的消息内容了。如果沒有設定,預設是16進制的資料。

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

44、檢視剛剛測試時候收發的消息隊列裡面的資料,如下所示:

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範

45、一些額外補充:

Kafka也是消息隊列的一種,用于在高吞吐量場景下使用比較适合。如果是輕量級的,隻需要用于削峰,可以使用RabbitMQ。

以上隻是簡單的操作示範,至于要用得溜,觀衆朋友們可以自行補充所需的相關理論知識。

可視化工具還有一款yahoo提供的開源的工具,叫kafka-manager,有興趣的大佬們可以自行玩玩,開源位址:

https://github.com/yahoo/CMAK

還有一款滴滴平台做的開源的kafka運維管理平台,有興趣的大佬們也可以自行了解,位址:

https://github.com/didi/LogiKM

以上就是該部落格的全部内容,感謝各位大佬們的觀看~

歡迎加入QQ群:

群号:1079830632

【Kafka】基于Windows環境的Kafka有關環境(scala+zookeeper+kafka+可視化工具)搭建、以及使用.NET環境開發的案例代碼與示範