天天看點

Pulsar與Kafka消費模型對比

kafka 屬于 Stream 的消費模型,為了支援多 partition 的消費關系,引入了 consumer group 的概念,同時支援在消費端動态的 reblance 操作,當多個 Consumer 訂閱了同一個 Topic 時,會根據分區政策進行消費者訂閱分區的重配置設定。隻要 consumer-group 與 topic 之間的關系發生變更,就會動态觸發 reblance 操作,諸如:

增加或減少 topic 中 partition 的數目

consumer-group 中的 consumer 數減少

consumer-group 與 topic 之間的訂閱關系發生變更

等等

引入 reblance 的好處在于,當訂閱關系發生變更時,使用者無需重新啟動系統,就可以實作訂閱關系的變更,相當于 kafka 将這種配置設定的權利從服務端下放到用戶端中的 consumer 來管理,這樣使用者就可以自定義自己的配置設定方案。

類似 kafka 這樣的 Stream MQ,更多時候适合做離線業務的處理與分析,很多線上業務會使用 Active MQ 這樣 Queue 的 MQ。為了同時相容這兩種消費模型,pulsar 做了一層消費層的抽象,統一了 Queue 和 Stream 這兩種消費模型,具體如下圖所示:

其中,Exclusive 和 Failover 屬于 Stream 的消費模型,Share 屬于 Queue 的消費模型。在寫此文章時,pulsar 最新版本為 2.3.1,Key_Shared 屬于pulsar 新增加的一種訂閱模型,在之後的文章中,我們會單獨對 Key_shared 訂閱模型做單獨的分享,這裡不在贅述。

由于 kafka 不支援 Queue 類型的消費模型,是以 Share 這種形式在這裡不做對比。下面,和大家一起讨論以下在 Stream 下 pulsar 與 kafka 的消費模型。

如下圖所示,左邊為 pulsar 在 Failover 和 Exclusive 下的消費情況,右邊為 kafka 的消費模型。

假設目前有一個 topic,topic name 為 topic1,有 5 個partition,分别為:topic1-p1,topic1-p2,topic1-p3,topic1-p4,topic1-p5,在 kafka 中,使用了 consumer-group 且該 group 下有三個 consumer,上文中提到,kafka 支援 reblance 機制,是以當 consumer-2 與 consumer-3 加入 consumer-group 的過程中,會動态分攤之前 consumer-1 的消費壓力,表現為如上圖右半部分所示,cousumer-1 消費 topic1-p1 和 ropic1-p2,consumer-2 消費 topic1-p3 和 topic1-p4,consumer-3 消費 topic1-p5 。是以當使用者不斷的往 consumer-group 中添加 consumer 時,利用 kafka 的 reblance 機制,是可以讓使用者動态指定具體哪一個 consumer 來消費 topic1 中的哪些 partition。

在 pulsar 中,你可以将 subscribe 了解為 kafka 中的 consumer-group,如果使用者在啟動 consumer 時,指定的 subscribe-name 是相同的,說明這兩個 consumer 屬于同一個訂閱組,代碼示例如下:

<code></code>

如上圖示例所示,在同一個訂閱組下,啟動三個 consumer,在 pulsar 中,每一個 consumer 都會去訂閱 topic1 中的 5 個 partition,是以每個 consumer 都會去啟動 5 個 sub-consumer,在 failover 的訂閱模型下,會使用 hashcode 的形式,将 5 個 partition 配置設定給三個 consumer 來消費,pulsar 将目前正在消費的 sub-consumer 看作是處于 leader 狀态的 consumer,剩餘未工作的 sub-consumer 作為從節點,當 leader 狀态的 consumer 由于某些原因無法工作時,處于從狀态的 sub-consumer 會去接替 leader 的 consumer,并繼續工作。可以發現,kafka 加入 reblance 的機制,允許使用者自己指定哪些 consumer 來消費 哪些 partition,在 pulsar 中,這個工作由 failover 的機制來完成,它通過 hash 的形式,将 consumer 配置設定到不同的 sub-consumer 中來執行。

現在,驗證一下上述所描述的内容。

1. 以 standalone 的形式啟 pulsar

2. 建立一個 topic,partition 的數目為 4

以 failover 的訂閱類型,啟動 3 個 consumer,并指定他們為同一個訂閱組,即-s sub-1

3. 啟動 producer,發送 10 條資料到 <code>mytopic1</code>

結果如下所示:

可以看到,consumer1 接收到 2 條消息,consumer2 接收到 5 條消息,consumer3 接收到 3 條消息。效果和我們所預期的是一緻的。

上述情況是因為在 producer 發送之前,就已經啟動好三個 consumer 來消費消息,是以 pulsar 會以 hash 的形式将消息分發到三個 consumer 中來消費。

以 <code>Exclusive</code> 的訂閱形式啟動兩個 consumer,效果如下:

可以看到,當啟動 consumer2 時,會報錯 <code>Exclusive consumer is already connected</code>,這是因為,<code>Failover</code> 的訂閱模式下,其它的 consumer 會以 “從” consumer 的形态存在,但是 <code>Exclusive</code> 隻允許一個 consumer 訂閱一個 topic。

繼續閱讀