天天看點

Apache Pulsar 調研

Apache Pulsar 調研

大家好,我是 夢想家Alex 。相信大家對于 Pulsar 一定不陌生!最近正好看到一篇關于 Apache Pulsar 調研的文章,寫的非常細緻,是以借助本期内容分享給大家 ~

Apache Pulsar 調研

List

  1. Apache Pulsar
  2. 1. Kafka 概述
  1. 1.1 現存問題
  2. 1.3 優點
  3. 1.4 缺點
  1. 2. Pulsar 架構
  2. 2.4.1 三種寫路由政策
  3. 2.4.2 四種讀下發政策
  4. 2.4.3 Pull & Push 可選請求模式
  5. 2.4.4 Consume ACK 與 unACK
  6. 2.4.5 Data Retention
  7. 2.3.1 多租戶
  8. 2.3.2 Topic 配置設定
  9. 2.3.3 Topic Lookup
  10. Broker 的 LoadManager 線程
  11. bundle 與 ownership
  12. Topic 配置設定流程
  13. 設計優點
  14. 2.1.1 資料集合
  15. 2.1.2 存儲節點
  16. 2.1.3. 一緻性保證
  17. 2.1 Pulsar VS Kafka
  18. 2.2 Pulsar 架構
  19. 2.3 多租戶與 Topic Lookup
  20. 2.4 Produce / Consume 政策
  21. 3. Bookkeeper 架構
  22. 3.4.1 讀被均攤
  23. 3.4.2 讀有預期
  24. 3.4.3 讀結果無序
  25. 3.3.1 三種檔案
  26. 3.3.2 ADD 操作
  27. 3.3.3 結論
  28. 3.1.1 特性
  29. 3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment
  30. 3.1.3 結論
  31. 3.1 概念
  32. 3.2 架構
  33. 3.3 寫流程
  34. 3.4 讀流程
  35. 4. 水準擴容
  36. 4.1 水準擴充 Broker
  37. 4.2 水準擴充 Bookie
  38. 5. Pulsar Consistency
  39. 5.3.1 場景
  40. 5.3.2 流程
  41. 5.3.3 結論:Broker 故障秒級恢複
  42. 5.2.1 場景
  43. 5.2.2 流程
  44. 5.2.3 結論:Bookie 故障秒級恢複
  45. 5.1.1 備援副本
  46. 5.1.2 一緻性機制
  47. 5.1 一緻性機制
  48. 5.2 Bookie Auto Recovery:Ensemble Change
  49. 5.3 Broker Recovery:Fencing
  50. 6. Distributed Log 與 Raft
  51. 6.1 概念對比
  52. 6.2 流程對比
  53. 6.3 總結
  54. 7. 總結
  55. 7.1 Pulsar 的優點
  56. 7.2 Pulsar 的缺點

簡要總結下對 Pulsar 的調研。

Apache Pulsar

内容:

  • Kafka : 優缺點。
  • Pulsar : 多租戶,Topic Lookup,生産消費模式
  • Bookkeeper : 元件概念與讀寫流程
  • Horizontal Scale : Broker 或 Bookie 的橫向擴充
  • Consistency : Broker 或 Bookie crash 後保證日志一緻性
  • Distributed Log & Raft 算法
  • 總結

1. Kafka 概述

1.1 現存問題

主要問題:

  • 負載均衡需人工介入:手動按異構配置的 broker 對應生成 assignment 執行計劃。
  • 故障恢複不可控:broker 重新開機後需複制分區新資料并重建索引,其上的讀寫請求轉移到其他 broker,流量激增場景下可能會導緻叢集雪崩。

其他問題:

  • 跨資料中心備份需維護額外元件:MirrorMaker 官方也承認雞肋,做跨機房的備援複制依賴第三方元件如 uber 的 uReplicator

注:已脫敏。

1.3 優點

  • 生态成熟,易與 Flink 等現有元件內建。
  • 可參考資料多,完善的官方文檔和書籍。
  • 模型簡單易上手:partition 有 replication,以 segment 和 index 方式存儲。

1.4 缺點

計算與存儲耦合

  • 存儲節點有狀态:讀寫隻能走 Partition Leader,高負載叢集中 Broker 重新開機容易出現單點故障,甚至雪崩。
  • 手動負載均衡:叢集擴容必須手動 Assign Partitions 到新 Broker,才能分散讀寫的負載。

漫畫對比:https://jack-vanlightly.com/sketches/2018/10/2/kafka-vs-pulsar-rebalancing-sketch

2. Pulsar 架構

2.1 Pulsar VS Kafka

Pular Kafka
資料集合 Topic, Partition Topic, Partition
存儲節點及讀寫元件 Bookkeeper Bookie Broker
Pulsar Broker Client SDK
資料存儲單元 Partition -> Ledgers -> Fragments Partition -> Segments
資料一緻性保證 Ensemble Size metadata.broker.list
Write Quorum Size(QW) Replication Factor
Ack Quorum Size(QA) request.required.acks

注:(QW+1)/2 <= QA <= QW <= Ensemble Size <= Bookies Count

2.1.1 資料集合

  • Kafka:topic 切分為多個 partitions,各 partition 以目錄形式在 leader broker 及其多副本 brokers 上持久化存儲。
  • Pulsar:同樣有多個 partitions,但一個 partition 隻由一個 broker 負責讀寫(ownership),而一個 partition 又會均勻分散到多台 bookie 節點上持久化存儲。

2.1.2 存儲節點

  • Kafka:直接持久化到 broker,由 Client SDK 直接讀寫。
  • Pulsar:分散持久化到 bookie,由 broker 内嵌的 bookkeeper Client 負責讀寫。

2.1.3. 一緻性保證

  • Kafka:通過多 broker 叢集,每個 partition 多副本,producer 指定發送确認機制保證。
  • Pulsar:通過多 broker 叢集,broker Quorum Write 到 bookie,傳回 Quorum ACK 保證。

2.2 Pulsar 架構

Apache Pulsar 調研

2.3 多租戶與 Topic Lookup

2.3.1 多租戶

  • topic 分三層:​

    ​persistent://tenant/namespace/topic​

    ​,對應劃分為 ​

    ​department -> app -> topics​

    ​,以 namespace 為機關進行過期時間設定,ACL 通路鑒權控制。
  • 優點:按租戶進行 topic 資源隔離,并混部在同一叢集中,提高叢集使用率。

2.3.2 Topic 配置設定

Broker 的 LoadManager 線程
  • Leader:即 Broker Leader,類似 Kafka Controller,彙總所有 Broker 的負載,合理地配置設定 topic 分區。
  • Wroker:等待配置設定 bundle 内的所有 topic partition
bundle 與 ownership
  • 以 Namespace 為機關在 ZK 維護 bundle ring(broker 的數量 2~3 倍),topic 分區按​

    ​hash(topic_partition)%N​

    ​ 落到 bundle 中。
  • Broker 唯一綁定到 bundle,就對 bundle 内的所有 topic partition 持有 ownership,用于 Broker Recovery 保證高可用。
Apache Pulsar 調研
Topic 配置設定流程
  • 上報負載:LoadManager Worker 負責向 ZK 彙報負載名額
zk> get /loadbalance/brokers/localhost:8080
{
  "pulsarServiceUrl": "pulsar://localhost:6650",
  "cpu": {
    "usage": 23,
    "limit": 50
  },
  "memory": {
    "usage": 1,
    "limit": 10
  },
  "msgThroughputIn": 100,
  "msgThroughputOut": 100
}      
  • bundle 為機關配置設定:LoadManager Leader 彙總其他 Brokers 的負載,根據負載配置設定 bundle
zk> get /loadbalance/leader
{"serviceUrl":"http://localhost:8080","leaderReady":false}      
  • 配置設定結果:
zk> ls /namespace/public/default
[0x00000000_0x40000000, 0x40000000_0x80000000, 0x80000000_0xc0000000, 0xc0000000_0xffffffff]

zk> get /namespace/public/default/0x80000000_0xc0000000
{"nativeUrl":"pulsar://localhost:6650","httpUrl":"http://localhost:8080","disabled":false}      
設計優點

不同于 kafka 将所有 topic ISR 等中繼資料記錄到 zk,pulsar 隻記錄 topic 的分區數,不記錄 topic 到 broker 的映射關系,zk 中繼資料數量極少,是以支援百萬量級 topic

zk> get /admin/partitioned-topics/public/default/persistent/partitioned-topic-1
{"partitions":2}      

2.3.3 Topic Lookup

  • Client 向任一 BrokerA 發起 Lookup 請求,如​

    ​persistent://public/default/test-topic-1​

  • BrokerA 計算 default namespace 下​

    ​hash(topic_partition)%N​

    ​ 的值,得到該 topic partition 對應的 bundle,進而查出 ownership BrokerX
  • BrokerA 傳回 owner BrokerX 位址。

2.4 Produce / Consume 政策

2.4.1 三種寫路由政策

  • RoundRobinPartition(預設):以 batching 為機關,通過輪詢将消息均勻發給 brokers,以獲得最大吞吐。
  • SinglePartition
  • 有 KEY 則寫固定分區,類似​

    ​hash(key) mod len(partitions)​

    ​ 寫到指定分區。
  • 無 KEY 則随機選一個分區,寫入該 producer 的所有消息。
  • CustomPartition:使用者可自定義針對具體到消息的分區政策,如 Java 實作 ​

    ​MessageRouter​

    ​ 接口。

2.4.2 四種讀下發政策

  • Exclusive (預設):獨占消費,一對一,保證有序消費,能批量 ACK,是 Failover 特例,不保證高可用。
  • Failover:故障轉移消費,一對一,備選多,保證有序消費,消費者高可用,能批量 ACK,保證高可用。
  • Shared:共享消費,多對多
    Apache Pulsar 調研
  • Round Robin 分發消息,類似 Consumer Group 但不保證有序消費。
  • 隻能逐條 ACK:Consumer crash 時才能精确控制消息的重發。
  • 水準擴充 Consumer 直接提讀吞吐。不像 kafka 必須先擴 Partition 才能擴 Consumer
  • Key_Shared:按 KEY 共享消費,多對多,Exclusive 和 Shared 的折中模式。
  • KEY hash 相同的消息會被相同 consumer 消費,保證有序消費。
  • 隻能逐條 ACK
  • 水準擴充 Consumer 提高讀吞吐。
    Apache Pulsar 調研

2.4.3 Pull & Push 可選請求模式

  • Consumer 可以同步或異步 p Receive 消息。
  • Consumer 可以本地注冊 MessageListener 接口來等待 Broker Push 消息。

2.4.4 Consume ACK 與 unACK

  • 逐條 ACK、批量 ACK
  • 取消 ACK:consumer 消費出錯可請求重新消費,發送取消 ACK 後 broker 會重發消息。
  • exclusive, failover:隻能取消上一次送出的 ACK,單個 consumer 可控復原。
  • shared, key_shared:類比 ACK,consumers 隻能取消上一條發出的 ACK

與 ​

​__consumer_offsets​

​ 機制類似 ,Broker 收到各消費者的 ACK 後,會更新 Consumer 的消費進度 cursor,并持久化到特定的 ledger 中。

2.4.5 Data Retention

  • 預設積極保留:最慢的 subscription 堆積的消息都不能被删除,最壞的情況是某個 subscription 下線後,cursor 依舊會保留在 message streaming 中,會導緻消息過期機制失效。
  • 消息過期:時間或大小兩個次元設定限制,但隻對積極保留之前的消息生效
  • TTL:強制移動舊慢 cursor 到 TTL 時間點,若 TTL == Retention,則與 kafka 一樣強制過期
Apache Pulsar 調研

兩個名額

  • Topic Backlog:最慢的 subscription 的 cursor 到最新一條消息之間的消息數量。
  • Storage Size:topic 總空間。
  • 按 segment 粒度删除,以 Last Motify Time 是否早于 Retention 為标準過期,與 kafka 一緻
  • 注:bookie 并非同步過期,空間釋放是背景程序定期清理

3. Bookkeeper 架構

append-only 的分布式 KV 日志系統,K 是 ​

​(Ledger_id, Entry_id)​

​​ 二進制組,V 是 ​

​(MetaData, RawData)​

​ 二進制資料。

3.1 概念

3.1.1 特性

  • 高效寫:append-only 磁盤順序寫。
  • 高容錯:通過 bookie ensemble 對日志進行備援複制。
  • 高吞吐:直接水準擴充 bookie 提高讀寫吞吐。

3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment

  • Ensemble Size:指定一段日志要寫的 bookies 數量。
  • Ensembles:指定寫一段日志的目标 bookies 集合。
  • Write Quorum:指定一條日志要寫的 bookie 數量。
  • ACK Quorum:指定一條日志要确認已寫入的 bookie 數量。
  • Segment / Ledger:要寫入的一段日志。
  • Fragment:寫入的一條日志。
Apache Pulsar 調研

3.1.3 結論

  • Client 會以 Round Robin 的政策挑選出 bookie,依次順延寫 entry
  • Client 隻等待 ACK Quorum 個 broker 傳回 Append ACK 就認為寫成功。
  • 一個 Segment / Ledger 包含多個 Fragment
  • Fragment 内的 entry 呈帶狀連續分布在 Ensembles Bookies 上。
  • 一個周期内,一台 Bookie 會存儲不連續的 ​

    ​(EnsembleSize - WriteQuorum)​

    ​ 條 Entry

3.2 架構

三個元件

  • zk / etcd:強一緻性中繼資料存儲
  • 中繼資料存儲:ledger 中繼資料。
  • 服務發現:bookie 的注冊中心,bookie 互相發現,client 讀取叢集全部 bookie 位址。
  • Bookie:存儲節點,隻允許​

    ​ADD​

    ​ / ​

    ​READ​

    ​ 兩個操作,不保證一緻性,不保證可用性,功能簡單。
  • Client:實作備援複制的邏輯,保證資料的一緻性,實作複雜且最重要。
Apache Pulsar 調研

3.3 寫流程

3.3.1 三種檔案

  • Journal WAL
  • 概念:用于持久化存儲 bookie 操作 ledger 的事務日志,接收來自多個 Ledger Client 寫入的不同 ledger entries,直接 高效地 append 到記憶體,随後 fsync 順序寫磁盤,延遲低。
  • 清理:當 Write Cache 完成 Flush 落盤後自動删除。
  • Entry Logs
  • 概念:真正落盤的日志檔案,有序儲存不同 ledger 的 entries,并維護 Write Cache 加速熱日志的查找。
  • 清理:bookie 背景 GC 線程定期檢查其關聯的 ledgers 是否在 zk 上已删除,若已删除則自動清理。
  • Index Files
  • 概念:高效順序寫的副作用是,必須在外圍維護 ​

    ​(ledger_id, entry_id)​

    ​ 到 ​

    ​Entry_Log​

    ​ 的映射索引,才能實作高效讀,故 Flush Cache 時會分離出索引檔案。
  • 實作:可選 RocksDB 和檔案存儲索引。

3.3.2 ADD 操作

  • Clients 混亂地給 Bookie 發來不同 ledger 的日志。
  • Bookie 往追加寫 Journal,同時向 Write Cache 有序寫(Write Cache 内部使用 SkipList 實作動态有序,同時保證讀寫都高效)
  • WriteCache 寫滿後 Flush 分離出 index 檔案和落盤的日志檔案。
  • 删除舊 Journal,建立新 Journal 繼續追加寫,如此循環。
Apache Pulsar 調研

3.3.3 結論

broker 内部為每個 ledger 持久化了其存儲的 entry logs,并建立索引提高讀效率。

3.4 讀流程

Client 發來 ​

​(ledger_id, entry_id)​

​ 的 KEY

  • 熱 KEY:在 Write Cache 中則直接傳回。
  • 冷 KEY:讀取 ledger_id 對應的 index 檔案,根據 index 找出 entry_id 對應的 entry log 再傳回。

3.4.1 讀被均攤

如同輪詢寫,Cleint 也會輪詢 Ensembles 均攤讀取,同樣不存在 leader 讀瓶頸。

3.4.2 讀有預期

若某個 Bookie 讀響應确實很慢,Client 會向其他副本 Bookie 發起讀請求,同時等待,進而保證讀延時低。

3.4.3 讀結果無序

Client 往 bookie 寫是輪詢無序地寫,故從 Ensembles 中讀到是消息是無序的,需在 Client 端自行按 entry_id 重新排序,以保證有序響應。

4. 水準擴容

4.1 水準擴充 Broker

新 Broker 加入叢集後,Broker Leader 會将高負載 Broker 的部分 topic ownership 轉移給新 Broker,進而分攤讀寫壓力。

4.2 水準擴充 Bookie

新 Bookie 加入叢集後,Broker 通過 ZK 感覺到,并将 ledger 的新 entry log 寫到新 Bookie,提高存儲層的讀寫吞吐、存儲容量。

Apache Pulsar 調研

5. Pulsar Consistency

5.1 一緻性機制

日志的備援複制、一緻性保證均由 Bookkeeper Client 實作。

5.1.1 備援副本

由如上的 Eensembles 的 QW 和 QA 的多副本寫,保證每條日志确實持久化到了 bookie 中。

5.1.2 一緻性機制

滑動視窗:​

​[0, ..., READABLE ... LAC], [LAC+1, ... WAIT_QUOROM ..., LAP]​

  • LAP(Last Add Pushed):Client 發出的最後一條 entry_id(從 0 自增的正整數)
  • LAC(Last Add Confirmed):Client 收到的最後一條 ACK 的 entry_id,是一緻性的邊界。

實作一緻性的三個前置條件:

  • 寫 ledger 隻能以 Append-Only 方式追加寫,寫滿後變為 Read-Only
  • 一個 Ledger 同一時間隻會有一個 Client 在寫。
  • LAC 必須按照 LAP 的順序,依次進行 ACK 确認:保證 LAC 作為一緻性邊界,前邊的日志可讀,後邊的日志等待多副本複制。
Apache Pulsar 調研

5.2 Bookie Auto Recovery:Ensemble Change

5.2.1 場景

bookie crash 下線後,需恢複副本數量。

5.2.2 流程

  • 存在 Leader Bookie 5 作為 Daemon Auditor,不斷向其他 Bookies 發送心跳保活。
  • Auditor 發現 Bookie 4 逾時,讀取 zk 發現 ledger x 的​

    ​[0, 7)​

    ​ entry_id 區間需要從 4 轉移到新 Bookie
  • 找出負載較小的 Bookie 6,并根據 Ensembles 發現備援資料分布在​

    ​{B1, B2, B3, B5}​

  • 按輪詢均攤複制讀壓力的方式,将 entry log 逐一複制到 Bookie 6
  • 複制完畢後修改 ZK 中繼資料,将 LAC0 的副本 4 替換為 6

Apache Pulsar 調研

5.2.3 結論:Bookie 故障秒級恢複

  • 寫請求快速轉移:

    Bookie 6 加入 Ensembles 後,直接代替 Bookie 4 繼續 Append 日志。因為副本數恢複是各個 Ensembles 内部各節點的 Auditor 線程背景異步複制,不會導緻 Client 的寫中斷,整個 Recovery 過程對 Client 幾乎透明。

  • LAC 分界線記錄 Ensemble Change 曆史:

    在 ZK 的 ledger metadata 中,會記錄每次 Recovery 導緻的 ensembles 更新,即記錄了 ledger 各 entry log 區間的分布情況。

    如下中繼資料記錄了 ledger16 在 LAC46 處,Bookie 3183 下線,随後 Bookie 3182 上線從 LAC47 處繼續處理請求:

> get /ledgers/00/0000/L0016
ensembleSize: 3
quorumSize: 2
ackQuorumSize: 2
lastEntryId: -1
state: OPEN
segment {
  ensembleMember: "10.13.48.57:3185"
  ensembleMember: "10.13.48.57:3184"
  ensembleMember: "10.13.48.57:3183"
  firstEntryId: 0
}
segment {
  ensembleMember: "10.13.48.57:3185"
  ensembleMember: "10.13.48.57:3184"
  ensembleMember: "10.13.48.57:3182"
  firstEntryId: 47
}      

注意:右上可看出 ZK 中各 ledger 的中繼資料寫死了 Bookie 的 IP,容器部署時若 Bookie 重新開機後 IP 變化,會導緻舊 Ledger 的該副本廢棄,故在 k8s 上部署時應選擇 DaemonSet 或 StatefulSet

5.3 Broker Recovery:Fencing

5.3.1 場景

Broker crash,或 Broker 與 ZK 出現網絡分區導緻腦裂,需進行 partition ownership 轉移。

5.3.2 流程

  • Broker1 心跳逾時後,ZK 将 topic partition 的 ownership 轉移到 Broker2
  • Broker2 向 Ensemble 發起 Fencing ledger_X 請求,Bookies 紛紛将 ledger_X 置為 Fencing 不可寫狀态。
  • Broker1 寫資料失敗收到 FenceException,說明該 partition 已被 Broker 接管,主動放棄 ownership
  • Client 收到異常後與 Broker1 斷開連接配接,進行 Topic Lookup 與 Broker2 建立長連接配接。
  • 同時,Broker2 對 ledger_X LAC1 之後的 entry log 依次逐一進行 Forwarding Recovery(若 unknow 狀态的 entry 副本數實際上已達到 WQ,則認為該 entry 寫成功,LAC1 自增為 LAC2)
  • Broker2 更新 ledger_X 的 metadata,将其置為 CLOSE 狀态,再建立新 ledger,繼續處理 Client 的寫請求。

Apache Pulsar 調研

5.3.3 結論:Broker 故障秒級恢複

  • 不複用舊 ledger,降低複雜度

    若複用舊 ledger_X,必須保證所有 ensemble 的 LAC 一緻,同時涉及尾部 entry 的強一緻複制,邏輯複雜。直接 CLOSE 能保證舊 ledger 不會再被寫入。

  • Recovery 邏輯簡單,耗時短

    在 Client 的視角,隻需等待兩個過程:

    等待結束後,直接往新 Broker 的新 ledger 上追加寫資料,Broker 不參與任何資料備援複制的流程,是以是無狀态的,可以直接水準擴充提升以提升吞吐。

  • ZK 進行 partition ownership 的轉移。
  • 新 Broker 對 UNKNOWN 狀态的尾部 entry 進行 Forwarding Recovery

6. Distributed Log 與 Raft

6.1 概念對比

概念 Raft DL
role Leader 與 Followers Writer (broker) 與 Bookies
failover term ledger_id
replication Majority AppendEntries RPC Quorum Write
consistency Last Committed Index Last Add Confirmed(LAC)
brain split Majority Vote Broker Fencing

6.2 流程對比

Apache Pulsar 調研

6.3 總結

  • LAC 與 LAP 的存在,使 entry 能以内嵌順序中繼資料的方式,均勻分散存儲到各台 bookie 中。
  • DL 與 Raft 不同之處在于:
各 bookie 節點的資料不是從單個節點異步複制而來,而是由 Client 直接輪詢分發。
  • 為保證 bookie 能快速 append 日志,bookkeeper 設計了 Journal Append-only 順序寫日志機制。
  • 為保證 bookie 能快速根據​

    ​(lid, eid)​

    ​ 讀取消息​

    ​(entry)​

    ​,bookkeeper 設計了 Ledger Store

是以,各 bookie 存儲節點的身份是平等的,沒有傳統一緻性算法的 Leader 和 Follower 的概念,完美避開了讀寫隻能走 Leader 導緻 Leader 容易成為單點瓶頸的問題。

同時,能直接添加新 Bookie 提升讀寫吞吐,并降低其他舊 Bookie 的負載。

7. 總結

7.1 Pulsar 的優點

直接解決 Kafka 容器平台現有的手工擴容、故障恢複慢的問題。

  • 穩定性可用性高:秒級 Broker / Bookie 的快速故障恢複。
  • 水準線性擴容:存儲與計算分離,可對 Broker 擴容提升讀寫吞吐,可對 Bookie 擴容降低叢集負載并提升存儲容量。
  • 擴容負載均衡:Bookie 擴容後新的 ledger 會在新 Bookie 上建立,自動均攤負載。

7.2 Pulsar 的缺點

  • 概念多,系統複雜,隐藏 bug 修複門檻高。
  • 背書少,國内僅騰訊金融和智聯招聘在使用。