天天看點

深入解析 Apache Pulsar 雙階段删除協定的工作原理

作者:存儲矩陣

文章摘要

本文整理自 ApacheCon Asia 2022 大會上,StreamNative 工程師趙延的分享《深入探究 Apache Pulsar 雙階段删除協定的工作原理》。目前,Apache Pulsar 對于資料的删除存在兩個步驟,即删除中繼資料與實際存儲資料。由于這兩個步驟互相分開,我們無法保證明際存儲資料一定能被删除,是以可能存在中繼資料已删除而實際存儲資料依然存在的現象。現有 Pulsar 的使用者在生産環境也遇到了該問題,存在大量的髒資料無法删除。對此,我們引入雙階段删除協定來解決該問題。本文将詳細介紹兩段資料删除的工作原理。

作者介紹

趙延,StreamNative 軟體工程師,Apache Pulsar Contributor。

背景

關于 Apache Pulsar

Apache Pulsar 是雲原生的消息和流平台。雲原生的定義是“将計算與存儲分離,可與雲基礎設施和 Kubernetes 一起工作的多層方案”,Pulsar 計算存儲分離,存儲有狀态而計算無狀态,可以友善地啟動或者停止 Broker 運作在 K8s 上。

深入解析 Apache Pulsar 雙階段删除協定的工作原理

上圖是 Apache Pulsar 的架構簡介。生産者将消息發送給 Broker,而 Broker 并不負責消息的存儲,消息存儲在底層的 Apache BookKeeper 上。Broker 收到消息後通過 Pulsar 用戶端将消息發送給 Bookie。消費者将訂閱請求發送給 Broker,後者從 BookKeeper 中讀取資料并發送給消費者。可見 Broker 和 BookKeeper 層分别負責計算和存儲,是存算分離的設計。

負責存儲的 BookKeeper 是多副本協定存儲系統,具備以下特性:

  • 高資料存儲容量,可以通過增加服務端 Bookie 節點來無限擴容;
  • 高一緻性(資料重複讀取時保證一緻性);
  • 讀寫高可用性;
  • I/O 隔離。
  • Broker 收到消息後,存儲資料的操作分為以下幾步:

1. 通過 BookKeeper 用戶端,在 BookKeeper 中建立 Ledger handler 處理器(負責處理所有寫入請求);

2. 将上述 Ledger handler 的唯一辨別 ID 添加到中繼資料存儲中;

3. 通過 Ledger handler 将資料發送給 BookKeeper 服務端完成資料寫入。

上述任何一步失敗,本次資料存儲操作都會失敗。

關于 Broker 資料删除

與存儲資料相對,Broker 删除資料的操作分為以下兩步:

1. 在中繼資料存儲中删除 Ledger handler 的 ID;

2. 第一步成功後,通過 BookKeeper 用戶端删除 Ledger handler。

如果先完成第二步的操作再操作第一步,而第一步又因為 Broker 重新開機而失敗,那麼Broker 就會誤認為已經被删除的資料仍然存在。是以第一步和第二步的操作順序不能調換。

但這裡又産生了一個問題:考慮到兩步操作分别由計算和存儲端執行,就可能出現第一步的操作成功而第二步操作失敗的情況。在第二步的操作失敗後,Pulsar 會重試三次,三次重試全部失敗就不會繼續重試,資料就會永久儲存在 BookKeeper 中,占據額外的存儲空間。

目前 Apache Pulsar 提供了一種工具來探測此類狀況,通過中繼資料存儲和真實資料存儲的對比來發現未能成功删除的資料,但這一過程較為複雜,還會産生誤删資料的風險,并帶來更多運維壓力。

基于 System Topic 的雙階段資料删除協定

針對上述問題,Apache Pulsar 新引入了基于 System Topic 的雙階段資料删除協定。該協定的基本工作流程如下:

深入解析 Apache Pulsar 雙階段删除協定的工作原理

1. 删除資料時,System Topic Producer 會向 System Topic Consumer 發送一個删除事件消息。

2. 後者收到消息後給 Broker 發送删除指令,要求後者執行删除操作。

3. 如果資料的中繼資料 Ledger 已被删除,System Topic Consumer 會自己到資料存儲中執行删除操作,删除真實資料。

Pulsar 引入了以下三個新的 System Topic:

  • pulsar/system/persistent/__ledger_deletion:存儲删除 Ledger 的事件;
  • pulsar/system/persistent/__ledger_deletion-RETRY:存儲重試事件,當消費者消費消息失敗,或删除 Ledger 失敗後,Pulsar 會将删除事件發送給該 Topic,該 Topic 在一段時間後将删除事件重新投遞給消費者;
  • pulsar/system/persistent/__ledger_deletion-DLQ:當消費者重複删除某 Ledger 達到一定次數後,删除事件會發送給該死信隊列,不再消費。Pulsar 會認為該 Ledger 短時間内無法被删除。使用者可以檢視歸檔死信隊列中的資料,判斷哪些資料沒有被真正删除。

在 Broker 啟動時,Pulsar 會通過 pulsarAdmin 來建立分區的 System Topic,即上述三個 Topic。之是以是分區 Topic,是因為 Pulsar 會自動建立一些 Topic;當删除資料時,Broker 收到消息後會自動建立一個非分區 Topic。删除 Ledger 的事件都會寫入該非分區 Topic。

但當 Broker 配置變化時,例如使用者将 allowAutoTopicCreationType=non-partitioned 改為 allowAutoTopicCreationType=partitioned,Broker 就會自動建立一個分區 Topic 來存儲消息。之前非分區 Topic 中的消息不會再有消費者訂閱,對應的 Ledger 也無法删除。

雙階段資料删除詳解

第一階段

Broker 啟動時會啟動一個生産者,為之後的資料删除做準備。

client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
    .topic("pulsar/system/persistent/__ledger deletion")
    .enableBatching(false)
    .createAsync();           

需要注意這裡禁止了 Batching,因為如果删除多個 Ledger 的消息事件 Batch 到同一消息中,當消費者收到消息後會很難 ack 消息。

第一階段中,删除一個 Ledger 分為以下步驟:

1. 向 System Topic 發送删除 Ledger 事件消息;

2. 從中繼資料存儲中删除 Ledgerid。 如果第一步失敗,第二步則不能進行,這樣才能保證消費者可以收到消息删除 Ledger。隻有中繼資料 Ledgerid 删除成功,第一階段才完成。

第二階段

在 Broker 啟動時,會啟動一個消費者來消費删除 Ledger 事件消息。

client.newConsumer(Schema-AVRO(PendingDeleteLedgerInfo.class))
    .topic("pulsar/system/persistent/_ledger_deletion")
    .subscriptionName("ledger-deletion-worker")
    .subscriptionType(SubscriptionType.Shared)
    .enableRetry(true)
    .deadLetterPolicy(DeadLetterPolicy.builder()
        .retryLetterTopic("pulsar/system/persistent/_ledger_deletion-RETRY")
        .deadLetterTopic("pulsar/system/persistent/_ledger_deletion-DLQ")
        .maxRedeliverCount(10).build())
    .subscribeAsync()           

上述配置中,ledger-deletion-worker 是統一的消息訂閱名稱,有多個 Broker 時會共享同一訂閱,消費進度一緻。一個消費事件消費失敗 10 次後進入死信隊列。

第二階段流程如下:

深入解析 Apache Pulsar 雙階段删除協定的工作原理

System Topic Consumer 啟動完成後,會收到生産者發送的删除事件。然後檢查 Ledger 所屬的 Topic 是否存在,如果存在就将删除操作交給 Broker 處理,否則在 Consumer 一側直接删除資料。

在 Broker 删除 Ledger 時會檢查 Ledger 是否仍在使用中,Ledgerid 是否存在于中繼資料中。如果存在,這個 Ledger 可能還在被使用,是以不能删除,需要等待 10 分鐘後再重新消費删除事件。如果不存在,就正常删除資料并 ack 消息。

安全性

使用 System Topic 來存儲要删除 Ledger 的消息會帶來安全性問題。因為使用者可以通過生産者來向 System Topic 直接發送消息,是以可以被惡意利用,導緻資料誤删除。是以在删除資料時會做中繼資料校驗。

目前存儲在 BookKeeper 中的 Ledger 儲存了 Ledger 所屬的 Topic 資訊。删除某個 Ledger 時會拉取 Ledger 對應的 Topic,和本次要删除的 Ledger Topic 參數進行校驗,通過後才會删除。

但上述操作會有一定性能損耗,是以在 Broker 側引入了一個删除緩存,避免每次删除都拉取中繼資料。生産者發送删除事件消息後,待删除 Ledgerid 會被緩存到記憶體中。當 Broker 收到删除指令,會檢查 Ledgerid 是否在緩存中,如果是,會認為本次删除活動是受信任的;如果不是,則拉取中繼資料進行校驗。Broker 重新開機後緩存才會被清空。

如何删除 Topic

引入雙階段删除後,要删除一個 Topic 時,需要通過雙階段删除操作逐個删除所有 Ledger 才能删除 Topic。

一些中間狀态

  • 删除 Ledger 事件消息成功發送,中繼資料卻因為 Broker 當機而未删除。消費者收到事件消息後,會将删除事件重發給 Broker。Broker 會檢查對應的 Ledgerid 是否存在于中繼資料中,如果還存在就不會删除資料。
  • 删除 Ledger 事件消息成功發送,中繼資料卻因為 Broker 當機而未删除。片刻後,生産者重發消息給 System Topic,且這一次删除成功。這相當于給 System Topic 發送了兩次删除同一 Ledger 的消息,消費者也會給 Broker 發送兩次删除指令。第一次過程中 Broker 已經删除了資料,第二次過程中 BookKeeper 會發現要删除的資料已不存在,則會發送一個特殊狀态碼給用戶端,此時認為第二次删除操作也是成功的。
  • 消費者收到消息并成功删除 Ledger,但未 ack 消息就當機。這時删除事件消息會重新投遞給消費者,再次執行删除操作。由于删除操作是幂等的,第二次操作會被認為成功并 ack 成功。

總結

目前,Apache Pulsar 對于資料的删除的兩個步驟,删除中繼資料與實際存儲資料互相分開,可能導緻存在大量的髒資料無法删除的情況。對此,我們引入 [PIP-186][1] 雙階段删除協定來解決該問題。

本文詳細介紹了雙階段資料删除的工作原理。在第一階段,Broker 啟動時會啟動一個生産者,如果第一步 Ledger 删除失敗,第二步則不能進行,以此保證消費者可以收到消息删除 Ledger。在第二階段,啟動 Broker 時,會啟動一個消費者來消費删除 Ledger 事件消息。該特性相關 PR 正在稽核中,功能還未上線,關注 Apache Pulsar 郵件清單以及後續的版本釋出獲得最新資訊。

引用連結

[1] [PIP-186]: https://github.com/apache/pulsar/issues/165