天天看點

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

目錄

  • 消息隊列演化
    • kafak
    • RocketMQ
    • Pulsar
  • 背景知識
    • At-least-once語義
    • At-most-once語義
    • Exactly-once語義
  • RocketMQ、Kafka、Pulsar 事務消息
    • RocketMQ 的事務消息
      • 補償流程
    • kafka的事務消息
      • 流處理的需求
      • Exactly-once語義實作
      • 幂等性Producer的作用範圍
      • 事務型Producer
    • Pulsar 的事務消息
      • Pulsar 事務具有以下語義
      • Pulsar 事務消息由以下幾個關鍵點構成
      • 處理流程一般分為以下幾個步驟
  • 結論

RocketMQ、Kafka 和 Pulsar 都是當今業界應用十分廣泛的開源消息隊列(MQ)元件。筆者在工作中遇到關于 MQ 選型相關的内容,了解到關于“事務消息”這個概念在不同的 MQ 元件裡有不同内涵。故借此文,試着淺析一番這三種消息隊列(MQ)的事務消息有何異同,目的是形成關于消息隊列事務消息的全景視圖,給有類似業務需求的同學提供一些參考和借鑒。

消息隊列演化

消息隊列(Message Queue,簡稱 MQ),是指在消息的傳輸中儲存消息的容器或服務,是一種異步的服務間通信方式,适用于無伺服器和微服務架構,是分布式系統實作高性能、高可用、可伸縮等進階特效的重要元件。

常見的主流消息隊列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Pulsar 等。

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

kafak

Apache Kafka 是由 Apache 軟體基金會開發的一個開源消息系統項目,由 Scala 寫成。

Kafka 最初是由 LinkedIn 開發,并于 2011 年初開源。2012 年 10 月從 Apache Incubator 畢業。該項目的目标是為處理實時資料提供一個統一、高通量、低等待的平台。

Kafka 是一個分布式的、分區的、多複本的日志送出服務。它通過一種獨一無二的設計提供了一個消息系統的功能。

其整體架構圖如下所示:

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

RocketMQ

Apache RocketMQ 是一個分布式消息和流媒體平台,具有低延遲、強一緻、高性能和可靠性、萬億級容量和靈活的可擴充性。它有借鑒 Kafka 的設計思想,但不是 Kafka 的拷貝。

其整體架構圖如下所示:

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

Pulsar

Apache Pulsar 是 Apache 軟體基金會頂級項目,是下一代雲原生分布式消息流平台。

它集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支援多租戶、持久化存儲、多機房跨區域資料複制,具有強一緻性、高吞吐、低延時及高可擴充性等流資料存儲特性,被看作是雲原生時代實時消息流傳輸、存儲和計算最佳解決方案。

其整體架構圖如下所示:

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

背景知識

在分布式系統中,任何節點都有可能出現異常甚至當機。在消息隊列中也一樣,當 Producer 在生産消息時,可能會發生 Broker 當機不可用,或者網絡突然中斷等異常情況。

根據在發生異常時 Producer 處理消息的方式,系統可以具備以下三種消息語義。

At-least-once語義

Producer 通過接收 Broker 的 ACK(消息确認)通知來確定消息成功寫入 Topic。

然而,當 Producer 接收 ACK 通知逾時,或者收到 Broker 出錯資訊時,會嘗試重新發送消息。

如果 Broker 正好在成功把消息寫入到 Topic,但還沒有給 Producer 發送 ACK 時當機,Producer 重新發送的消息會被再次寫入到 Topic,最終導緻消息被重複分發至 Consumer。即:消息不會丢失,但有可能被重複發送。

At-most-once語義

當 Producer 在接收 ACK 逾時,或者收到 Broker 出錯資訊時不重發消息,那就有可能導緻這條消息丢失,沒有寫入到 Topic 中,也不會被 Consumer 消費到。

在某些場景下,為了避免發生重複消費,我們可以容許消息丢失的發生。即:消息可能會丢失,但絕不會被重複發送。

Exactly-once語義

Exactly-once 語義保證了即使 Producer 多次發送同一條消息到服務端,服務端也僅僅會記錄一次。

Exactly-once 語義是最可靠的,同時也是最難了解的。Exactly-once 語義需要消息隊列服務端,消息生産端和消費端應用三者的協同才能實作。

RocketMQ、Kafka、Pulsar 事務消息

RocketMQ 的事務消息

RocketMQ 在 4.3.0 版中已經支援分布式事務消息,這裡 RocketMQ 采用了 2PC 的思想來實作了送出事務消息,同時增加一個補償邏輯來處理二階段逾時或者失敗的消息。

流程如下圖所示:

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

其具體工作流程分為正常事務消息的發送及送出和不正常情況下事務消息的補償流程:

  • 在消息隊列上開啟一個事務主題;
  • 事務中第一個執行的服務發送一條“半消息”(半消息和普通消息的唯一差別是,在事務送出之前,對于消費者來說,這個消息是不可見的)給消息隊列;
  • 半消息發送成功後,發送半消息的服務就會開始執行本地事務,根據本地事務執行結果來決定事務消息送出或者復原;
  • 本地事務成功後會讓這個“半消息”變成正常消息,供分布式事務後面的步驟執行自己的本地事務;

這裡的事務消息,Producer 不會因為 Consumer 消費失敗而做復原,采用事務消息的應用,其所追求的是高可用和最終一緻性,消息消費失敗的話,RocketMQ 自己會負責重推消息,直到消費成功。

補償流程

RocketMQ 提供事務反查來解決異常情況,如果 RocketMQ 沒有收到送出或者復原的請求,Broker 會定時到生産者上去反查本地事務的狀态,然後根據生産者本地事務的狀态來處理這個“半消息”是送出還是復原。

值得注意的是我們需要根據自己的業務邏輯來實作反查邏輯接口,然後根據傳回值 Broker 決定是送出還是復原。

而且這個反查接口需要是無狀态的,請求到任意一個生産者節點都會傳回正确的資料。

其中,補償流程用于解決消息 Commit 或者 Rollback 發生逾時或者失敗的情況。在 RocketMQ 事務消息的主要流程中,一階段的消息如何對使用者不可見。

其中,事務消息相對普通消息最大的特點就是一階段發送的消息對使用者是不可見的。

那麼,如何做到寫入消息但是對使用者不可見呢?

RocketMQ 事務消息的做法是:如果消息是“半消息”,将備份原消息的主題與消息消費隊列,然後改變主題為 RMQ_SYS_TRANS_HALF_TOPIC。

由于消費組未訂閱該主題,故消費端無法消費“半消息”的消息,然後 RocketMQ 會開啟一個定時任務,從 Topic 為

RMQ_SYS_TRANS_HALF_TOPIC

中拉取消息進行消費。

根據生産者組擷取一個服務提供者發送回查事務狀态請求,根據事務狀态來決定是送出或復原消息。

講到這裡大家就明白了,這裡說的就是上文提到分布式事務中的消息事務,目的是在分布式事務中實作系統的最終一緻性。

kafka的事務消息

與 RocketMQ 的事務消息用途不同,Kafka 的事務基本上是配合其幂等機制來實作 Exactly-once語義的。

開發此功能的原因可以總結如下:

流處理的需求

随着流處理的興起,對具有更強處理保證的流處理應用的需求也在增長。

例如,在金融行業,金融機構使用流處理引擎為使用者處理借款和信貸。這種類型的用例要求每條消息都隻處理一次,無一例外。

換句話說,如果流處理應用程式消費消息 A 并将結果作為消息 B(B = f(A)),那麼恰好一次處理保證意味着當且僅當 B 被成功生産後 A 才能被标記為消費,反之亦然。

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

事務 API 使流處理應用程式能夠在一個原子操作中使用、處理和生成消息。這意味着,事務中的一批消息可以從許多主題分區接收、生成和确認。一個事務涉及的所有操作都作為整體成功或失敗。

目前,Kafka 預設提供的傳遞可靠性保障是 At-least-once。如果消息成功“送出”,但 Broker 的應答沒有成功發送回 Producer 端(比如網絡出現瞬時抖動),那麼 Producer 就無法确定消息是否真的送出成功了。是以,它隻能選擇重試,這就是 Kafka 預設提供 At-least-once 保障的原因,不過這會導緻消息重複發送。

Exactly-once語義實作

簡單來說,這是通過兩種機制:

  • 幂等性(Idempotence)
  • 事務(Transaction)

幂等性 Producer:“幂等”這個詞原是數學領域中的概念,指的是某些操作或函數能夠被執行多次,但每次得到的結果都是不變的。

幂等性有很多好處,其最大的優勢在于我們可以安全地重試任何幂等性操作,反正它們也不會破壞我們的系統狀态。

如果是非幂等性操作,我們還需要擔心某些操作執行多次對狀态的影響,但對于幂等性操作而言,我們根本無需擔心此事。

在 Kafka 中,Producer 預設不是幂等性的,但我們可以建立幂等性 Producer。它其實是 0.11.0.0 版本引入的新功能。

enable.idempotence 被設定成 true 後,Producer 自動更新成幂等性 Producer,其他所有的代碼邏輯都不需要改變。

Kafka 自動幫你做消息的重複去重。Kafka 為了實作幂等性,它在底層設計架構中引入了 ProducerID 和 SequenceNumber。

ProducerID:在每個新的 Producer 初始化時,會被配置設定一個唯一的 ProducerID,用來辨別本次會話

SequenceNumber:對于每個 ProducerID,Producer 發送資料的每個 Topic 和 Partition 都對應一個從 0 開始單調遞增的 SequenceNumber 值

Broker 在記憶體維護(pid,seq)映射,收到消息後檢查 seq,Producer 在收到明确的的消息丢失 ack,或者逾時後未收到 ack,要進行重試。

  • new_seq=old_seq+1:正常消息
  • new_seq<=old_seq:重複消息
  • new_seq>old_seq+1:消息丢失

幂等性Producer的作用範圍

首先,它隻能保證單分區上的幂等性,即一個幂等性 Producer 能夠保證某個主題的一個分區上不出現重複消息,它無法實作多個分區的幂等性。

其次,它隻能實作單會話上的幂等性,不能實作跨會話的幂等性。這裡的會話,你可以了解為 Producer 程序的一次運作。當你重新開機了 Producer 程序之後,這種幂等性保證就喪失了。

如果想實作多分區以及多會話上的消息無重複,應該怎麼做呢?答案就是事務(transaction)或者依賴事務型 Producer。這也是幂等性 Producer 和事務型 Producer 的最大差別。

事務型Producer

事務型producer能夠保證将消息原子性地寫入到多個分區中。這批消息要麼全部寫入成功,要麼全部失敗。另外,事務型 Producer 也不受程序的重新開機影響。Producer 重新開機後,Kafka 依然保證它們發送消息的 Exactly-once 處理。

和普通 Producer 代碼相比,事務型 Producer 的顯著特點是調用了一些事務 API。如

initTransaction

beginTransaction

commitTransaction

abortTransaction

,它們分别對應事務的初始化、事務開始、事務送出以及事務終止。

Kafka 事務消息是由 Producer、事務協調器、Broker、組協調器、Consumer 等共同參與實作的,如下:

producer

  • transactionalId

    為 Producer 指定固定的

    TransactionalId

    (事務 id),可以穿越 Producer 的多次會話(Producer 重新開機/斷線重連)中,持續辨別 Producer 的身份;
  • epoch

    每個生産者增加一個 epoch。用于辨別同一個 TransactionalId 在一次事務中的 epoch,每次初始化事務時會遞增,進而讓服務端可以知道生産者請求是否舊的請求。使用 epoch 辨別 Producer 的每一次“重生”,可以防止同一 Producer 存在多個會話。

  • 幂等性

    消息行為上Producer 遵從幂等性執行

事務協調器(Transaction Coordinator)

引入事務協調器,類似于消費組負載均衡的協調者,每一個實作事務的生産端都被配置設定到一個事務協調者。以兩階段送出的方式,實作消息的事務送出。

事務協調器使用一個特殊的 Topic:即事務 Topic,事務 Topic 本身也是持久化的,日志資訊記錄事務狀态資訊,由事務協調者寫入。

事務協調器通過 RPC 調用,協調 Broker 和 Consumer 實作事務的兩階段送出。

每一個 Broker 都會啟動一個事務協調器,使用 hash(TransactionalId)确定 Producer 對應的事務協調器,使得整個叢集的負載均衡。

Broker

引入控制消息(

Control Messages

):這些消息是用戶端産生的并寫入到主題的特殊消息,但對于使用者來說不可見。它們是用來讓 Broker 告知消費者之前拉取的消息是否被原子性送出。

Broker 處理事務協調器的 commit/abort 控制消息,把控制消息向正常消息一樣寫入 Topic(圖中标 c 的消息,和正常消息交織在一起,用來确認事務送出的日志偏移),并向前推進消息送出偏移 hw。

RocketMQ、Kafka 和 Pulsar的事務消息有何異同消息隊列演化背景知識RocketMQ、Kafka、Pulsar 事務消息結論

組協調器

如果在事務過程中,送出了消費偏移,組協調器在 offset log 中寫入事務消費偏移。當事務送出時,在 offset log 中寫入事務 offset 确認消息。

Consumer

Consumer 過濾未送出消息和事務控制消息,使這些消息對使用者不可見。有兩種實作方式:

  • Consumer 緩存方式:設定 isolation.level=read_uncommitted,此時 topic 的所有消息對 Consumer 都可見,Consumer 緩存這些消息,直到收到事務控制消息。若事務 commit,則對外釋出這些消息;若事務 abort,則丢棄這些消息。
  • Broker 過濾方式:設定 isolation.level=read_committed,此時 topic 中未送出的消息對 Consumer 不可見,隻有在事務結束後,消息才對 Consumer 可見。Broker 給 Consumer 的 BatchRecord 消息中,會包含一清單,指明哪些是“abort”事務,Consumer 丢棄 abort 事務的消息即可。

因為事務機制會影響消費者所能看到的消息的範圍,它不隻是簡單依賴高水位來判斷。它依靠一個名為

LSO(Log Stable Offset)

的位移值來判斷事務型消費者的可見性。

Pulsar 的事務消息

Apache Pulsar 在 2.8.0 正式支援了事務相關的功能,Pulsar 這裡提供的事務差別于 RocketMQ 中 2PC 那種事務的實作方式,沒有本地事務回查的機制,更類似于 Kafka 的事務實作機制。

Apache Pulsar 中的事務主要用來保證類似 Pulsar Functions 這種流計算場景中 Exactly-once 語義的實作。

這也符合 Apache Pulsar 本身 Event Streaming 的定位,即保證端到端(End-to-End)的事務實作的語義。

在 Pulsar 中,對于事務語義是這樣定義的:允許事件流應用将消費、處理、生産消息整個過程定義為一個原子操作,即生産者或消費者能夠處理跨多個主題和分區的消息,并確定這些消息作為一個單元被處理。

Pulsar 事務具有以下語義

  • 事務中的所有操作都作為一個單元送出。要麼送出所有消息,要麼都不送出。
  • 每條消息隻寫入或處理一次,不會丢失資料或重複(即使發生故障)。
  • 如果事務中止,則此事務中的所有寫入和确認都将復原。

事務中的批量消息可以被以多分區接收、生産和确認:

  • 消費者隻能讀取已送出(确認)的消息。換句話說,Broker 不傳遞屬于打開事務的事務消息或屬于中止事務的消息。
  • 跨多個分區的消息寫入是原子性的。
  • 跨多個訂閱的消息确認是原子性的。訂閱下的消費者在确認帶有事務 ID 的消息時,隻會成功确認一次消息。

Pulsar 事務消息由以下幾個關鍵點構成

事務 ID(TxnID)

辨別 Pulsar 中的唯一事務。事務 ID 長度是 128-bit。最高 16 位保留給事務協調器的 ID,其餘位用于每個事務協調器中單調遞增的數字。

事務協調器(TC)

是運作在 Pulsar Broker 中的一個子產品。它維護事務的整個生命周期,并防止事務進入錯誤狀态;它處理事務逾時,并確定事務在事務逾時後中止。

事務日志

所有事務中繼資料都儲存在事務日志中。事務日志由 Pulsar 主題記錄。如果事務協調器崩潰,它可以從事務日志恢複事務中繼資料。

事務日志存儲事務狀态,而不是事務中的實際消息(實際消息存儲在實際的主題分區中)。

事務緩存

向事務内的主題分區生成的消息存儲在該主題分區的事務緩沖區(TB)中。

在送出事務之前,事務緩沖區中的消息對消費者不可見。當事務中止時,事務緩沖區中的消息将被丢棄。

事務緩沖區将所有正在進行和中止的事務存儲在記憶體中。所有消息都發送到實際的分區 Pulsar 主題。

送出事務後,事務緩沖區中的消息對消費者具體化(可見)。事務中止時,事務緩沖區中的消息将被丢棄。

待确認狀态

挂起确認狀态在事務完成之前維護事務中的消息确認。如果消息處于挂起确認狀态,則在該消息從挂起确認狀态中移除之前,其他事務無法确認該消息。

挂起的确認狀态被保留到挂起的确認日志中(cursor ledger)。新啟動的 broker 可以從挂起的确認日志中恢複狀态,以確定狀态确認不會丢失。

處理流程一般分為以下幾個步驟

  • 開啟事務。
  • 使用事務釋出消息。
  • 使用事務确認消息。
  • 結束事務。

Pulsar 的事務處理流程與 Kafka 的事務處理思路大緻上保持一緻,大家都有一個 TC 以及對應的一個用于持久化 TC 所有操作的 Topic 來記錄所有事務狀态變更的請求。

同樣的在事務開始階段也都有一個專門的 Topic 來去查詢 TC 對應的 Owner Broker 的位置在哪裡。

不同的是:

  • Kafka 中對于未确認的消息是維護在 Broker 端的,但是 Pulsar 的是維護在 Client 端的,通過 Transaction Timeout 來決定這個事務是否執行成功,是以有了 Transaction Timeout 的存在之後,就可以確定 Client 和 Broker 側事務處理的一緻性。
  • 由于 Kafka 本身沒有單條消息的 Ack,是以 Kafka 的事務處理隻能是順序執行的,當一個事務請求被阻塞之後,會阻塞後續所有的事務請求,但是 Pulsar 是可以對消息進行單條 Ack 的,是以在這裡每一個事務的 Ack 動作是獨立的,不會出現事務阻塞的情況。

結論

RocketMQ 和 Kafka/Pulsar 的事務消息實用的場景是不一樣的。

  • RocketMQ

RocketMQ 中的事務,它解決的問題是,確定執行本地事務和發消息這兩個操作,要麼都成功,要麼都失敗。并且 RocketMQ 增加了一個事務反查的機制,來盡量提高事務執行的成功率和資料一緻性。

  • Kafka

Kafka 中的事務,它解決的問題是,確定在一個事務中發送的多條消息,要麼都成功,要麼都失敗。這裡面的多條消息不一定要在同一個主題和分區中,可以是發往多個主題和分區的消息。

當然也可以在 Kafka 事務執行過程中開啟本地事務來實作類似 RocketMQ 事務消息的效果。但是 Kafka 是沒有事務消息反查機制的,它是直接抛出異常的,使用者可以根據異常來實作自己的重試等方法保證事務正常運作。

  • 比較

它們的共同點就是:都是通過兩階段送出來實作事務的,事務消息都儲存在單獨的主題上。

不同的地方就是 RocketMQ 是通過“半消息”來實作的,Kafka 是直接将消息發送給對應的 topic,通過用戶端來過濾實作的。

同時它們兩個使用的場景差別是非常之大的,RockteMQ 主要解決的是基于本地事務和消息的資料一緻性,而 Kafka 的事務則是用于實作它的 Exactly-once 機制,應用于實時流計算的場景中。

  • Pulsar

Pulsar 的事務消息和 Kafka 應用場景和語義類似,隻是由于底層實作機制有差别,在一些細節上有差別。

相信看到這裡就非常清楚了,對于事務消息如何選型和應用,首先要明白你的業務需求是什麼。

是要實作分布式事務的最終一緻性,還是要實作 Exactly-once (精确一次)語義?明白之後需求,選擇什麼元件就十分明确了。

轉自公衆号 51CTO技術棧:https://mp.weixin.qq.com/s/GfDmb2ss1fkvRJ3jlSAYnA