天天看點

消息隊列 Kafka 的基本知識及 .NET Core 用戶端

前言

最新項目中要用到消息隊列來做消息的傳輸,之是以選着 Kafka 是因為要配合其他 java 項目中,是以就對 Kafka 了解了一下,也算是做個筆記吧。

本篇不談論 Kafka 和其他的一些消息隊列的差別,包括性能及其使用方式。

簡介

Kafka 是一個實作了分布式的、具有分區、以及複制的日志的一個服務。它通過一套獨特的設計提供了消息系統中間件的功能。它是一種釋出訂閱功能的消息系統。

一些名詞

如果要使用 Kafka ,那麼在 Kafka 中有一些名詞需要知道,文本不讨論這些名詞是否在其他消息隊列中具有相同的含義。所有名詞均是針對于 Kafka。

Message

消息,就是要發送的内容,一般包裝成一個消息對象。

Topic

通俗來講的話,就是放置“消息”的地方,也就是說消息投遞的一個容器。假如把消息看作是信封的話,那麼 Topic 就是一個郵筒,如下圖所示:

消息隊列 Kafka 的基本知識及 .NET Core 用戶端

Partition && Log

Partition 分區,可以了解為一個邏輯上的分區,像是我們電腦的磁盤 C:, D:, E: 盤一樣,

Kafka 為每個分區維護着一份日志Log檔案。

每個分區是一個有序的,不可修改的,消息組成的隊列。 當消息過來的時候,會被追加到日志檔案中,這個追加是根據 commit 指令來執行的。

分區中的每一條消息都有一個編号,叫做 offset id,這個 id 在目前分區中是唯一的,并且是遞增的。

日志,就是用來記錄分區中接收到的消息,因為每一個 Topic 可以同時向一個或者多個分區投遞消息,是以實際在存儲日志的時候,每個分區會對應一個日志目錄,其命名規則一般為 

<topic_name>-<partition_id>

, 目錄中就是一個分區的一份 commit log 日志檔案。

消息隊列 Kafka 的基本知識及 .NET Core 用戶端
Kafka 叢集會儲存一個時間段内所有被釋出出來的資訊,無論這個消息是否已經被消費過,這個時間段是可以配置的。比如日志儲存時間段被設定為2天,那麼2天以内釋出的消息都是可以消費的;而之前的消息為了釋放空間将會抛棄掉。Kafka的性能與資料量不相幹,是以儲存大量的消息資料不會造成性能問題。
對日志進行分區主要是為了以下幾個目的:第一、這可以讓log的伸縮能力超過單台伺服器上線,每個獨立的partition的大小受限于單台伺服器的容積,但是一個topic可以有很多partition進而使得它有能力處理任意大小的資料。第二、在并行處理方面這可以作為一個獨立的單元。

生産者 Producers

和其他消息隊列一樣,生産者通常都是消息的産生方。

在 Kafka 中它決定消息發送到指定Topic的哪個分區上。

消息隊列 Kafka 的基本知識及 .NET Core 用戶端

消費者 Consumers

消費者就是消息的使用者,在消費者端也有幾個名詞需要區分一下。

一般消息隊列有兩種模式的消費方式,分别是 隊列模式 和 訂閱模式。

隊列模式:一對一,就是一個消息隻能被一個消費者消費,不能重複消費。一般情況隊列支援存在多個消費者,但是對于一個消息,隻會有一個消費者可以消費它。

訂閱模式:一對多,一個消息可能被多次消費,消息生産者将消息釋出到Topic中,隻要是訂閱改Topic的消費者都可以消費。

Consumer && Subscriber

Group: 組,是一個消費者的集合,每一組都有一個或者多個消費者,Kafka 中在一個組内,消息隻能被消費一次。

在釋出訂閱模式中,消費者是以組的方式進行訂閱的,就是Consumer Group,他們的關系如下圖:

消息隊列 Kafka 的基本知識及 .NET Core 用戶端

每個釋出到Topic上的消息都會被投遞到每個訂閱了此Topic的消費者組中的某一個消費者,也就是每個組都會被投遞,但是每個組都隻會有一個消費者消費這個消息。

開頭介紹了Kafka 是 釋出-訂閱 功能的消息隊列,是以在Kafka中,隊列模式是通過單個消費者組實作的,也就是整個結構中隻有一個消費者組,消費者之間負載均衡。

Kafka 叢集

Borker: Kafka 叢集有多個伺服器組成,每個伺服器稱做一個 Broker。同一個Topic的消息按照一定的key和算法被分區存儲在不同的Broker上。

消息隊列 Kafka 的基本知識及 .NET Core 用戶端

上圖引用自:http://blog.csdn.net/lizhitao

因為 Kafka 的叢集它是通過将分區散布到各個Server的實作的,也就是說叢集中每個伺服器他們都是彼此共享分區的資料和請求,每個分區的日志檔案被複制成指定分數,分散在各個叢集機器,這樣來實作的故障轉移。

對于每一個分區都會有一個伺服器作為它的 "leader" 并且有零個或者多個伺服器作為"followers" 。leader 伺服器負責處理關于這個 partition 所有的讀寫請求, followers 伺服器則被動的複制 leader 伺服器。如果有 leader 伺服器失效,那麼 followers 伺服器将有一台被自動選舉成為新的 leader 。每個伺服器作為某些 partition 的 leader 的同時也作為其它伺服器的 follower ,進而實作了叢集的負載均衡。

.NET Core Kafka 用戶端

在 .NET Core 中,有相對應的開源 kafka sdk 項目,就是 Rdkafka。它同時支援 .NET 4.5,并且支援跨平台,可以運作于Linux,macOS 和 Windows。

RdKafka Github :https://github.com/ah-/rdkafka-dotnet

生産者 API

// Producer 接受一個或多個 BrokerList
using (Producer producer = new Producer("127.0.0.1:9092"))
//發送到一個名為 testtopic 的Topic,如果沒有就會建立一個
using (Topic topic = producer.Topic("testtopic")) {
    //将message轉為一個 byte[]
    byte[] data = Encoding.UTF8.GetBytes("Hello RdKafka");
    DeliveryReport deliveryReport = await topic.Produce(data);
    
    Console.WriteLine($"發送到分區:{deliveryReport.Partition}, Offset 為: {deliveryReport.Offset}");
}      

消費者 API

//配置消費者組
var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) {

    //注冊一個事件
    consumer.OnMessage += (obj, msg) =>
    {
        string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
        Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
    };

    //訂閱一個或者多個Topic
    consumer.Subscribe(new[] { "testtopic" });
    
    //啟動
    consumer.Start();

    Console.WriteLine("Started consumer, press enter to stop consuming");
    Console.ReadLine();      

繼續閱讀