天天看點

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

文章目錄

  • 前言
  • 什麼是Pulsar?
  • 1 Pulsar特性
  • 2 Pulsar消息隊列
    • 2.1 生産者
      • 2.1.1 發送模式
      • 2.1.2 通路模式
      • 2.1.3 壓縮
      • 2.1.4 批量處理
      • 2.1.5 分塊(chunking)
    • 2.2 消費者
      • 2.2.1 接收模式
      • 2.2.2 确認
      • 2.2.3 取消确認
      • 2.2.4 确認逾時
      • 2.2.5 死信主題
      • 2.2.6 Retry letter topic
    • 2.3 topic
      • 2.3.1 分區topic
      • 2.3.2 消息的保留和過期
      • 2.3.3 消息去重
      • 2.3.4 消息延遲傳遞
    • 2.4 命名空間
    • 2.5 訂閱
  • 3 Pulsar的架構
    • 3.1 Broker
    • 3.2 Cluster
    • 3.3 中繼資料存儲
    • 3.4 持久化存儲
      • 3.4.1 Apache Bookkeeper
      • 3.4.2 Ledgers
      • 3.4.3 Ledger讀一緻性
      • 3.4.4 ledgers管理(managed ledger)
    • 3.5 日志存儲
    • 3.6 Pulsar協定
  • 4 Pulsar與kafka的差別
    • 4.1 性能對比
    • 4.2 對比總結

前言

最近apache pulsar出鏡率挺高的,這裡新開一篇文檔記錄一下pulsar的學習之路

參考連結:

Apache Pulsar 官網

知乎 pulsar的架構與核心概念

CSDN 什麼是Pulsar

CSDN Apache Pulsar之與Apache Kafka的異同

掘金 Pulsar從入門到實作

什麼是Pulsar?

Pulsar是由雅虎建立的開源的、 是一個用于伺服器到伺服器的消息系統,具有多租戶、高性能等優勢。現在是Apache基金會的一個孵化項目。

1 Pulsar特性

  1. Pulsar 的單個執行個體原生支援多個叢集,可跨機房在叢集間無縫地完成消息複制。
  2. 極低的釋出延遲和端到端延遲。
  3. 可無縫擴充到超過一百萬個 topic。
  4. 簡單的用戶端 API,支援 Java、Go、Python 和 C++。
  5. 支援多種 topic 訂閱模式(獨占訂閱、共享訂閱、故障轉移訂閱)。
  6. 通過 Apache BookKeeper 提供的持久化消息存儲機制保證消息傳遞 。
  7. 由輕量級的 serverless 計算架構 Pulsar Functions 實作流原生的資料處理。
  8. 基于 Pulsar Functions 的 serverless connector 架構 Pulsar IO 使得資料更易移入、移出 Apache Pulsar。
  9. 分層式存儲可在資料陳舊時,将資料從熱存儲解除安裝到冷/長期存儲(如S3、GCS)中。

Pulsar的關鍵特性:

關鍵特性 描述
Pulsar函數 使用對開發人員友好的API,可以輕松部署輕量級計算邏輯,無需運作自己的流處理引擎。
生産環境已證明 Pulsar已經在雅虎規模的生産環境中運作了3年多,每秒有數百萬條消息涉及數百萬個主題。
水準擴充 Pulsar叢集支援無縫水準擴充到數百個節點。
低延遲、支援持久存儲 Pulsar設計用于大規模的低延遲釋出(<5ms),具有強大的可用性保證。
跨域複制 專為跨多個地理區域的資料中心之間的配置資料複制而設計。
多租戶 原生支援多租戶,支援租戶間的隔離,身份驗證,授權和配額管理。
持久存儲 基于Apache BookKeeper的持久消息存儲。支援讀寫之間的IO隔離。
豐富的用戶端 Pulsar使用靈活的消息傳遞模型,支援Java,C ++,Python和Go。
可操作性 提供用于配置,管理,工具和監視的管理API,支援部署在裸機或Kubernetes上。

2 Pulsar消息隊列

2.1 生産者

生産者是一個附加到topic并将消息釋出到 Pulsar broker的程序。 Pulsar broker 處理消息。

2.1.1 發送模式

Producer 可以以同步(sync) 或 異步(async) 的方式釋出消息到 broker

發送模式 說明
同步發送 生産者在發送每條消息後等待代理的确認。 如果沒有收到确認,生産者将發送操作視為失敗。
異步發送 Producer 将把消息放于阻塞隊列中,并立即傳回。然後,用戶端将在背景将消息發送給 broker。 如果隊列已滿(最大大小可配置),則調用 API 時,producer 可能會立即被阻止或失敗,具體取決于傳遞給 producer 的參數。

2.1.2 通路模式

對于消息生産者來說在主題上你可以有不同類型的通路模式

通路模式 說明
Shared(預設) 一個topic可以有多個生産者釋出消息
Exclusive 僅有一個生産者可以在topic上釋出消息。如果已經有生産者連接配接,其他生産者試圖在這個主題上釋出資訊會立即出錯
WaitForExclusive

如果已經連接配接了生産者,則生産者建立處于挂起狀态(而不是逾時)直到生産者獲得Exclusive權限。成功成為唯一的生産者被視為上司者。

是以,如果你想為你的應用實作leader選舉方案,你可以使用這種通路模式

2.1.3 壓縮

Pulsar 生産者目前支援以下類型的壓縮:

  • LZ4
  • ZLIB
  • ZSTD
  • SNAPPY

2.1.4 批量處理

當批量處理啟用時,producer 會在單個請求中積累并發送一批消息。 批量處理的量大小由最大消息數和最大釋出延遲定義。 是以,積壓數量是分批處理的總數,而不是資訊總數。

在 Pulsar中,批次被跟蹤并存儲為單個單元,而不是單個消息。Consumer将批量處理的消息拆分成單個消息。但即使啟用了批量處理,也始終将計劃中的消息(通過 deliverAt 或者 deliverAfter 進行配置) 作為單個消息發送。

一般來說,當consumer确認了一個批的所有消息,該批才會被認定為确認。這意味着當發生不可預料的失敗、取消确認(negative acknowledgements)或确認逾時,都可能導緻批中的所有消息都被重新發送,即使其中一些消息已經被确認了。

為了避免将确認的消息批量重新發送給消費者,Pulsar從Pulsar 2.6.0開始引入了批量索引确認。當啟用批量索引确認時,消費者過濾掉已經确認的批量索引,并向broker發送批量索引确認請求。Broker 維護批量索引的确認狀态并跟蹤每批索引的确認狀态,以避免向consumer發送已确認的消息。當某一批消息的所有索引都被确認時,該批消息将被删除。

批量索引确認預設是關閉的(acknowledgmentAtBatchIndexLevelEnabled=false)

2.1.5 分塊(chunking)

  1. 分塊有以下特性:
  • 不能同時啟用批處理和分塊
  • 分塊僅僅支援持久化topic
  • 分塊僅僅支援exclusive和failover訂閱模式

當啟用分塊時(chunkingEnabled=true) ,如果消息大小大于允許的最大釋出有效載荷大小,則 producer 将原始消息分割成分塊的消息,并将它們與塊狀的中繼資料一起單獨和按順序釋出到 broker。 在 broker 中,分塊的消息将和普通的消息以相同的方式存儲在 Managed Ledger 上。 唯一的差別是,consumer 需要緩沖分塊消息,并在收集完所有分塊消息後将其合并成真正的消息。 Managed Ledger上的分塊消息可以和普通消息交織在一起。 如果 producer 未能釋出消息的所有分塊,則當 consumer 未能在過期時間(expire time) 内接收所有分塊時,consumer可以過期未完成的分塊。預設情況下,過期時間設定為1小時。

Consumer 會緩存收到的塊狀消息,直到收到消息的所有分塊為止。 然後 consumer 将分塊的消息拼接在一起,并将它們放入接收器隊列中。 用戶端從接收器隊列中消費消息。 一旦 consumer 使用整個大消息并确認,consumer 就會在内部發送與該大消息關聯的所有分塊消息的确認。當達到門檻值(maxPendingChunkedMessage)時,consumer通過靜默确認未分塊的消息或通過将其标記為未确認,要求 broker 稍後重新發送這些消息。

非Shared模式下,broker不需要任何更改來支援分塊。broker使用(chunkedMessageRate) 來記錄topic上的分塊消息速率。

  1. 處理一個 producer 和一個訂閱 consumer 的分塊消息

如下圖所示,當生産者向topic發送一批大的分塊消息和普通的非分塊消息時。 假設生産者發送的消息為 M1,M1 有三個分塊 M1-C1,M1-C2 和 M1-C3。 這個 broker 在其管理的ledger裡面儲存所有的三個塊消息,然後以相同的順序分發給消費者(獨占/災備模式)。 消費者将在記憶體緩存所有的塊消息,直到收到所有的消息塊。将這些消息合并成為原始的消息M1,發送給處理程序。

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別
  1. 多個生産者和一個生産者處理塊消息

當多個生産者釋出塊消息到單個topic,這個 Broker 在同一個 Ledger 裡面儲存來自不同生産者的所有塊消息。 如下所示,生産者1釋出的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三個塊組成。 生産者2釋出的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三個塊組成。 這些特定消息的所有分塊是順序排列的,但是其在 ledger 裡面可能不是連續的。 這種方式會給消費者帶來一定的記憶體負擔。因為消費者會為每個大消息在記憶體開辟一塊緩沖區,以便将所有的塊消息合并為原始的大消息。

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

2.2 消費者

在Consumer端有一個隊列,用于接收從broker推送來的消息。通過receiverQueueSize參數配置隊列的長度 (隊列的預設長度是1000) 每當 consumer.receive() 被調用一次,就從緩沖區(buffer)擷取一條消息。

2.2.1 接收模式

可以通過同步(sync) 或者異步(async)的方式從brokers接受消息。

接收模式 說明
同步接收 同步模式,在收到消息之前都是被阻塞的。
異步接收 異步接收模式會立即傳回一個 future 值(如 Java 中的 CompletableFuture),一旦收到新的消息就立刻完成。

2.2.2 确認

當消費者成功的消費了一條消息,這個消費者會發送一個确認資訊給broker。 這個消息時是永久儲存的,隻有在收到訂閱者消費成功的消息确認後才會被删除。 如果希望消息被 Consumer 确認後仍然保留下來,可配置消息保留政策實作。

對于批消息,如果批量索引确認是打開的,broker會維護批索引确認狀态并跟蹤每個批索引的确認狀态以避免将确認消息分發給消費者。當某一批消息的所有索引都被确認時,該批消息将被删除。

可以通過以下兩種方式确認消息:

  • 消息被單獨确認。 通過單獨确認,消費者需要确認每條消息并向broker發送确認請求
  • 累積确認模式 累積确認時,消費者隻需要确認最後一條他收到的消息。 所有之前(包含此條)的消息,都不會被再次發送給那個消費者。

2.2.3 取消确認

當消費者在某個時間沒有成功的消費某條消息,消費者想重新消費到這條消息,這個消費者可以發送一條取消确認消息到broker,broker會将這條消息重新發給消費者。

消息取消确認也有單條取消模式和累積取消模式 ,這依賴于消費者使用的訂閱模式。

在exclusive模式和failover模式中,消費者僅僅隻能對收到的最後一條消息進行取消确認。

在 shared和Key_Shared模式下,可以單獨取消确認消息。

2.2.4 确認逾時

如果消息沒有被成功消費,你想去讓 broker 自動重新傳遞這個消息, 你可以采用未确認消息自動重新傳遞機制。 用戶端會跟蹤 逾時 時間範圍内所有未确認的消息。 并且在指定逾時時間後會發送一個 重發未确認的消息 請求到 broker。

2.2.5 死信主題

死信主題使您可以在消費者無法成功消費某些消息時消費新消息。 在這種機制中,消費失敗的消息存儲在一個單獨的主題中,稱為死信主題。 您可以決定如何處理死信主題中的消息。

以下示例顯示如何使用預設死信主題在 Java 用戶端中啟用死信主題:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .build())
              .subscribe();
           

預設死信主題使用以下格式:

<topicname>-<subscriptionname>-DLQ
           

如果要指定死信主題的名稱,請使用此 Java 用戶端示例:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .deadLetterTopic("your-topic-name")
                    .build())
              .subscribe();
           

死信主題取決于消息重新傳遞。 由于确認逾時或取消确認,消息被重新傳送。 如果打算對消息使用取消确認,請確定在确認逾時之前對其進行取消确認。

2.2.6 Retry letter topic

很多線上的業務系統,由于業務邏輯處理出現異常,消息一般需要被重新消費。 若需要允許延時重新消費失敗的消息,你可以配置生産者同時發送消息到業務主題和重試主題,并允許消費者自動重試消費。 配置了允許消費者自動重試。如果消息沒有被消費成功,它将被儲存到重試主題當中。并在指定延時時間後,自動重新消費重試主題裡面的消費失敗消息。

預設情況下,自動重試處于禁用狀态。 您可以将 enableRetry 設定為 true 以啟用對使用者的自動重試。

如下例子所示,消費者會從重試主題消費消息。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(maxRedeliveryCount)
                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
                        .build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
           

2.3 topic

2.3.1 分區topic

單個topic的消息一般是由單個broker處理,為了提高topic的消息處理能力,pulsar提供了partitioned topic的支援,與kafka和rocketmq一樣,每個partition由不同的broker處理,在消費時,單個partition可選擇exclusive, failover和shared模式

Partitioned topic實際上是由n(partition的數量)個内部的topic組成的,每個内部的topic由一個broker處理,每個broker可處理多個topic,當消息發送到broker前,在producer端會通過routing mode将消息路由到某一個partition上,消息的生産與消費示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

2.3.2 消息的保留和過期

預設情況上,當broker會立刻删除所有收到了ack的消息,沒有被ack的消息會持久化存儲,但是我們可以修改pulsar的行為,pulsar允許我們存儲已經收到ack了的消息,也可以給未收到ack的消息設定過期時間(TTL)

2.3.3 消息去重

Pulsar支援在broker端對消息做去重,當打開消息去重後,重發的消息(重試等産生的)不會被重新存儲,這個特性使得pulsar對流式計算引擎(例如flink)更加友好,流式計算引擎更容易實作exactly-once語義的計算任務,消息去重的存儲示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

2.3.4 消息延遲傳遞

延時消息功能允許你能夠過一段時間才能消費到這條消息,而不是消息釋出後,就馬上可以消費到。

延遲消息傳遞僅适用于Shared模式。 在 Exclusive 和 Failover 訂閱模式下,延遲消息會立即發送。

如下圖所示,說明了延時消息的實作機制:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

預設情況下啟用延遲消息傳遞。 您可以在代理配置檔案中更改它

$ Whether to enable the delayed delivery for messages.
$ If disabled, messages are immediately delivered and there is no tracking overhead.
delayedDeliveryEnabled=true

$ Control the ticking time for the retry of delayed message delivery,
$ affecting the accuracy of the delivery time compared to the scheduled time.
$ Default is 1 second.
delayedDeliveryTickTimeMillis=1000
           

下面是 Java 當中生産延時消息一個例子

// message to be delivered at the configured delay interval
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();
           

2.4 命名空間

命名空間是租戶内部邏輯上的命名術語。 可以通過admin API在租戶下建立多個命名空間。 例如,包含多個應用程式的租戶可以為每個應用程式建立單獨的命名空間。 Namespace使得程式可以以層級的方式建立和管理topic Topicmy-tenant/app1 ,它的namespace是app1這個應用,對應的租戶是 my-tenant。 你可以在namespace下建立任意數量的topic。

2.5 訂閱

Pulsar支援exclusive、shared和failover三種消息訂閱模式,這三種模式的示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別
  1. Exclusive模式(獨占模式)

pulsar預設的消息訂閱模式,在這種模式下,中能有一個consumer消息消息,一個訂閱關系中隻能有一台機器消費每個topic,如果有多于一個consumer消費此topic則會出錯,消費示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別
  1. Failover模式

一個topic也是隻有單個消費消費一個訂閱關系的消息,與exclusive模式不同之處在于,failover模式下,每個消費者會被排序,目前面的消費者無法連接配接上broker後,消息會由下一個消費者消費,消費示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

3. Shared模式(共享模式)

消息可被多個consumer同時消費,這種模式下,無法保證消息的順序,并且無法使用one by one和cumulative的ack模式,消息通過roundrobin的方式投遞到每一個消費者,消費示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

key_shared模式是shared模式的一種,不同的是它按key對消息做投遞,相同的key的消息會被投遞到同一個消費者上,消費示意圖如下:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

3 Pulsar的架構

單個 Pulsar 叢集由以下三部分組成:

  • 一個或者多個 broker 負責處理和負載均衡 producer 發出的消息,并将這些消息分派給 consumer;Broker 與 Pulsar 配置存儲互動來處理相應的任務,并将消息存儲在 BookKeeper 執行個體中(又稱 bookies);Broker 依賴 ZooKeeper 叢集處理特定的任務,等等。
  • 包含一個或多個 bookie 的 BookKeeper 叢集負責消息的持久化存儲。
  • 一個Zookeeper叢集,用來處理多個Pulsar叢集之間的協調任務。

下圖為一個 Pulsar 叢集:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

叢集通過ZooKeeper來進行協處理,比如異地災備、異地複制。

3.1 Broker

Pulsar的broker是一個無狀态元件, 主要負責運作另外的兩個元件:

  • 一個 HTTP 伺服器,它為生産者和消費者公開管理任務和topic查找的 REST API。 生産者連接配接到broker釋出消息,消費者連接配接到broler來消費消息。
  • 一個排程分發器, 它是異步的TCP伺服器,通過自定義 二進制協定應用于所有相關的資料傳輸。

    出于性能考慮,消息通常從managed ledger緩存中分派,除非積壓超過緩存大小。如果積壓的消息對于緩存來說太大了, 則Broker将開始從BookKeeper那裡讀取Entries(Entry同樣是BookKeeper中的概念,相當于一條記錄)。

最後,為了支援全局Topic異地複制,Broker會控制Replicators追蹤本地釋出的條目,并把這些條目用Java 用戶端重新釋出到其他區域。

3.2 Cluster

一個 Pulsar 執行個體由一個或多個 Pulsar 叢集組成。 反過來,叢集包括:

  • 一個或者多個Pulsar brokers
  • 一個ZooKeeper協調器,用于叢集級别的配置和協調
  • 一組BookKeeper的Bookies用于消息的 持久化存儲

叢集間可以通過異地複制進行消息同步。

3.3 中繼資料存儲

Pulsar 中繼資料存儲維護一個 Pulsar 叢集的所有中繼資料,例如topic中繼資料、schema、broker加載資料等。Pulsar 使用 Apache ZooKeeper 進行中繼資料存儲、叢集配置和協調。Pulsar 中繼資料存儲可以部署在單獨的 ZooKeeper 叢集上,也可以部署在現有的 ZooKeeper 叢集上。您可以将一個 ZooKeeper 叢集同時用于 Pulsar 中繼資料存儲和 BookKeeper 中繼資料存儲。如果要部署連接配接到現有 BookKeeper 叢集的 Pulsar broker,則需要分别為 Pulsar 中繼資料存儲和 BookKeeper 中繼資料存儲部署單獨的 ZooKeeper 叢集。

在一個Pulsar執行個體中:

  • 配置存儲仲裁存儲租戶、命名空間和其他需要全局一緻的實體的配置
  • 每個叢集都有自己的本地 ZooKeeper 內建,用于存儲特定于叢集的配置和協調,例如哪些代理負責哪些topic以及所有權中繼資料、broker加載報告、BookKeeper ledger中繼資料等。

存儲配置

存儲配置維護一個 Pulsar 執行個體的所有配置,例如叢集、租戶、命名空間、分區topic相關配置等。一個Pulsar執行個體可以有一個本地叢集、多個本地叢集或多個跨區域叢集。是以,配置存儲可以在Pulsar執行個體下的多個叢集之間共享配置。配置存儲可以部署在單獨的 ZooKeeper 叢集上,也可以部署在現有的 ZooKeeper 叢集上。

3.4 持久化存儲

Pulsar 為應用程式提供有保證的消息傳遞。 如果消息成功到達 Pulsar broker,它将被傳遞到其預期目标。

為了提供這種保證,未确認送達的消息需要持久化存儲直到它們被确認送達。這種消息傳遞模式通常稱為持久消息傳遞。所有消息都被儲存并同步N份,例如,2個伺服器儲存四份,每個伺服器上面都有鏡像的RAID存儲。

3.4.1 Apache Bookkeeper

Pulsar用 Apache BookKeeper作為持久化存儲。 BookKeeper是一個分布式的預寫日志(WAL)系統,有如下幾個特性特别适合Pulsar的應用場景:

  • 它使Pulsar能夠利用許多獨立的日志,稱為Ledgers。随着時間的推移,可以為topic建立多個ledgers。
  • 為按條目複制的順序資料提供了非常高效的存儲。
  • 保證了多系統挂掉時ledgers的讀取一緻性。
  • 提供不同的Bookies之間均勻的IO分布的特性。
  • 它在容量和吞吐量方面都具有水準可擴充性。 通過向叢集添加更多bookies,可以立即增加容量。
  • Bookies被設計成可以承載數千的并發讀寫的ledgers。 使用多個磁盤裝置,一個用于日志,另一個用于一般存儲,這樣Bookies可以将讀操作的影響和對于寫操作的延遲分隔開。

除了消息資料,cursors也持久存儲在 BookKeeper 中。Cursors是消費端訂閱消費的位置。 BookKeeper讓Pulsar可以用一種可擴充的方式存儲消費位置。

目前,Pulsar 支援持久化消息存儲。 這說明了所有topic名稱中的持久性。下面是一個示例:

persistent://tenant/namespace/topic
           

Pulsar也支援臨時消息( (non-persistent) )存儲

下圖展示了brokers和bookies是如何互動的:

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

3.4.2 Ledgers

Ledger是一個隻追加的資料結構,并且隻有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入。 Ledger的條目會被複制到多個bookies。 Ledgers本身有着非常簡單的語義:

  • Pulsar Broker可以建立ledeger,添加内容到ledger和關閉ledger。
  • 當一個ledger被關閉後,除非明确的要寫資料或者是因為寫入器挂掉導緻ledger關閉,這個ledger隻會以隻讀模式打開。
  • 最後,當ledger中的條目不再有用的時候,整個legder可以被删除(ledger分布是跨Bookies的)。

3.4.3 Ledger讀一緻性

BookKeeper的主要優勢在于他能在有系統故障時保證讀的一緻性。 由于Ledger隻能被一個程序寫入(之前提的寫入器程序),這樣這個程序在寫入時不會有沖突,進而寫入會非常高效。 在一次故障之後,ledger會啟動一個恢複程序來确定ledger的最終狀态并确認最後送出到日志的是哪一個條目。 在這之後,能保證所有的ledger讀程序讀取到相同的内容。

3.4.4 ledgers管理(managed ledger)

鑒于 Bookkeeper Ledger提供單一日志抽象,在ledger之上開發了一個庫,稱為managed ledger,代表單個主題的存儲層。managed ledger即消息流的抽象,有一個寫入器程序不斷在流結尾添加消息,并且有多個cursors 消費這個流,每個cursor有自己的消費位置。

在内部,單個managed ledger使用多個 BookKeeper ledgers來存儲資料。 有多個ledgers的原因有兩個:

  • 在故障之後,原有的某個ledger不能再寫了,需要建立一個新的。
  • 當所有cursors都消耗了它包含的消息時,可以删除ledger。 這允許ledger的定期滾動。

3.5 日志存儲

在 BookKeeper 中,日志檔案包含 BookKeeper 事務日志。在更新到 ledger之前,bookie需要確定描述這個更新的事務被寫到持久(非易失)存儲上面。 在bookie啟動和舊的日志檔案大小達到上限(由 journalMaxSizeMB 參數配置)的時候,新的日志檔案會被建立。

3.6 Pulsar協定

Pulsar用戶端和Pulsar叢集互動的一種方式就是直連Pulsar brokers 。 然而,在某些情況下,這種直連既不可行也不可取,因為用戶端并不知道broker的位址。 例如在雲環境或者 Kubernetes 以及其他類似的系統上面運作Pulsar,直連brokers就基本上不可能了。

Pulsar 協定通過充當叢集中所有broker的單一網關來解決此問題。如果你選擇運作Pulsar Proxy(這是可選的),所有的用戶端連接配接将會通過這個代理而不是直接與brokers通信。

架構上來看,Pulsar Proxy從ZooKeeper上面讀取他所需要的所有資訊。 當啟動代理時,你隻需要提供用于叢集獨有和執行個體範圍的配置存儲的ZooKeeper連接配接串。 下面是一個示例:

$ bin/pulsar proxy \
  --zookeeper-servers zk-0,zk-1,zk-2 \
  --configuration-store-servers zk-0,zk-1,zk-2
           

關于Pulsar proxy有一些比較重要的注意點:

  • 連接配接用戶端不需要提供任何特定的配置來使用 Pulsar 協定。除了更新用于服務URL的IP之外,你不需要為現有的應用更新用戶端配置(例如你在Pulsar proxy上層架設運作了負載均衡器)。
  • Pulsar proxy支援TLS 加密 和 認證。

4 Pulsar與kafka的差別

4.1 性能對比

Pulsar 表現最出色的就是性能,Pulsar 的速度比 Kafka 快得多,與 Kafka 相比,Pulsar 的速度提升了 2.5 倍,延遲降低了 40%。

Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別
Apache Pulsar基本理論前言什麼是Pulsar?1 Pulsar特性2 Pulsar消息隊列3 Pulsar的架構4 Pulsar與kafka的差別

注:對比是針對 1 個分區的 1 個主題,其中包含 100 位元組消息,Pulsar 每秒可發送 220,000+ 條消息。

4.2 對比總結

Kafka Pulsar
概念 生産者-主題-消費者/消費者組 生産者-主題-訂閱-消費者
消費 關注分區上的流和獨占消息傳遞。沒有共同的消費

統一消息模型和API。

·通過獨占故障轉移訂閱進行流式傳輸

·通過共享訂閱隊列

ACK

簡單的offset管理

·在Kafka 0.8之前,偏移量存儲在ZooKeeper中

·Kafka 0.8之後,偏移量存儲在偏移量主題上

統一消息模型和API。

·通過獨占故障轉移訂閱進行流式傳輸

·通過共享訂閱隊列

Retention 基于保留删除消息。如果使用者在保留期之前沒有讀取消息,則會丢失資料。 隻有在所有訂閱使用消息之後才會删除它們。沒有資料丢失,甚至訂閱的消費者也長時間處于下降狀态。即使在所有訂閱都使用消息之後,也允許将消息儲存一段已配置的保留期間。
TTL 支援 不支援

TTL是訂閱的消費時間限制。如果在配置的TTL時間段内沒有任何消費者使用消息,則消息将自動标記為已确認。

繼續閱讀