大家好,我是 夢想家Alex 。相信大家對于 Pulsar 一定不陌生!最近正好看到一篇關于 Apache Pulsar 調研的文章,寫的非常細緻,是以借助本期内容分享給大家 ~
List
- Apache Pulsar
- 1. Kafka 概述
- 1.1 現存問題
- 1.3 優點
- 1.4 缺點
- 2. Pulsar 架構
- 2.4.1 三種寫路由政策
- 2.4.2 四種讀下發政策
- 2.4.3 Pull & Push 可選請求模式
- 2.4.4 Consume ACK 與 unACK
- 2.4.5 Data Retention
- 2.3.1 多租戶
- 2.3.2 Topic 配置設定
- 2.3.3 Topic Lookup
- Broker 的 LoadManager 線程
- bundle 與 ownership
- Topic 配置設定流程
- 設計優點
- 2.1.1 資料集合
- 2.1.2 存儲節點
- 2.1.3. 一緻性保證
- 2.1 Pulsar VS Kafka
- 2.2 Pulsar 架構
- 2.3 多租戶與 Topic Lookup
- 2.4 Produce / Consume 政策
- 3. Bookkeeper 架構
- 3.4.1 讀被均攤
- 3.4.2 讀有預期
- 3.4.3 讀結果無序
- 3.3.1 三種檔案
- 3.3.2 ADD 操作
- 3.3.3 結論
- 3.1.1 特性
- 3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment
- 3.1.3 結論
- 3.1 概念
- 3.2 架構
- 3.3 寫流程
- 3.4 讀流程
- 4. 水準擴容
- 4.1 水準擴充 Broker
- 4.2 水準擴充 Bookie
- 5. Pulsar Consistency
- 5.3.1 場景
- 5.3.2 流程
- 5.3.3 結論:Broker 故障秒級恢複
- 5.2.1 場景
- 5.2.2 流程
- 5.2.3 結論:Bookie 故障秒級恢複
- 5.1.1 備援副本
- 5.1.2 一緻性機制
- 5.1 一緻性機制
- 5.2 Bookie Auto Recovery:Ensemble Change
- 5.3 Broker Recovery:Fencing
- 6. Distributed Log 與 Raft
- 6.1 概念對比
- 6.2 流程對比
- 6.3 總結
- 7. 總結
- 7.1 Pulsar 的優點
- 7.2 Pulsar 的缺點
簡要總結下對 Pulsar 的調研。
Apache Pulsar
内容:
- Kafka : 優缺點。
- Pulsar : 多租戶,Topic Lookup,生産消費模式
- Bookkeeper : 元件概念與讀寫流程
- Horizontal Scale : Broker 或 Bookie 的橫向擴充
- Consistency : Broker 或 Bookie crash 後保證日志一緻性
- Distributed Log & Raft 算法
- 總結
1. Kafka 概述
1.1 現存問題
主要問題:
- 負載均衡需人工介入:手動按異構配置的 broker 對應生成 assignment 執行計劃。
- 故障恢複不可控:broker 重新開機後需複制分區新資料并重建索引,其上的讀寫請求轉移到其他 broker,流量激增場景下可能會導緻叢集雪崩。
其他問題:
- 跨資料中心備份需維護額外元件:MirrorMaker 官方也承認雞肋,做跨機房的備援複制依賴第三方元件如 uber 的 uReplicator
注:已脫敏。
1.3 優點
- 生态成熟,易與 Flink 等現有元件內建。
- 可參考資料多,完善的官方文檔和書籍。
- 模型簡單易上手:partition 有 replication,以 segment 和 index 方式存儲。
1.4 缺點
計算與存儲耦合
- 存儲節點有狀态:讀寫隻能走 Partition Leader,高負載叢集中 Broker 重新開機容易出現單點故障,甚至雪崩。
- 手動負載均衡:叢集擴容必須手動 Assign Partitions 到新 Broker,才能分散讀寫的負載。
漫畫對比:https://jack-vanlightly.com/sketches/2018/10/2/kafka-vs-pulsar-rebalancing-sketch
2. Pulsar 架構
2.1 Pulsar VS Kafka
Pular | Kafka | |
資料集合 | Topic, Partition | Topic, Partition |
存儲節點及讀寫元件 | Bookkeeper Bookie | Broker |
Pulsar Broker | Client SDK | |
資料存儲單元 | Partition -> Ledgers -> Fragments | Partition -> Segments |
資料一緻性保證 | Ensemble Size | metadata.broker.list |
Write Quorum Size(QW) | Replication Factor | |
Ack Quorum Size(QA) | request.required.acks |
注:(QW+1)/2 <= QA <= QW <= Ensemble Size <= Bookies Count
2.1.1 資料集合
- Kafka:topic 切分為多個 partitions,各 partition 以目錄形式在 leader broker 及其多副本 brokers 上持久化存儲。
- Pulsar:同樣有多個 partitions,但一個 partition 隻由一個 broker 負責讀寫(ownership),而一個 partition 又會均勻分散到多台 bookie 節點上持久化存儲。
2.1.2 存儲節點
- Kafka:直接持久化到 broker,由 Client SDK 直接讀寫。
- Pulsar:分散持久化到 bookie,由 broker 内嵌的 bookkeeper Client 負責讀寫。
2.1.3. 一緻性保證
- Kafka:通過多 broker 叢集,每個 partition 多副本,producer 指定發送确認機制保證。
- Pulsar:通過多 broker 叢集,broker Quorum Write 到 bookie,傳回 Quorum ACK 保證。
2.2 Pulsar 架構
2.3 多租戶與 Topic Lookup
2.3.1 多租戶
- topic 分三層:
,對應劃分為 persistent://tenant/namespace/topic
,以 namespace 為機關進行過期時間設定,ACL 通路鑒權控制。department -> app -> topics
- 優點:按租戶進行 topic 資源隔離,并混部在同一叢集中,提高叢集使用率。
2.3.2 Topic 配置設定
Broker 的 LoadManager 線程
- Leader:即 Broker Leader,類似 Kafka Controller,彙總所有 Broker 的負載,合理地配置設定 topic 分區。
- Wroker:等待配置設定 bundle 内的所有 topic partition
bundle 與 ownership
- 以 Namespace 為機關在 ZK 維護 bundle ring(broker 的數量 2~3 倍),topic 分區按
落到 bundle 中。hash(topic_partition)%N
- Broker 唯一綁定到 bundle,就對 bundle 内的所有 topic partition 持有 ownership,用于 Broker Recovery 保證高可用。
Topic 配置設定流程
- 上報負載:LoadManager Worker 負責向 ZK 彙報負載名額
|
- bundle 為機關配置設定:LoadManager Leader 彙總其他 Brokers 的負載,根據負載配置設定 bundle
|
- 配置設定結果:
|
設計優點
不同于 kafka 将所有 topic ISR 等中繼資料記錄到 zk,pulsar 隻記錄 topic 的分區數,不記錄 topic 到 broker 的映射關系,zk 中繼資料數量極少,是以支援百萬量級 topic
|
2.3.3 Topic Lookup
- Client 向任一 BrokerA 發起 Lookup 請求,如
persistent://public/default/test-topic-1
- BrokerA 計算 default namespace 下
的值,得到該 topic partition 對應的 bundle,進而查出 ownership BrokerXhash(topic_partition)%N
- BrokerA 傳回 owner BrokerX 位址。
2.4 Produce / Consume 政策
2.4.1 三種寫路由政策
- RoundRobinPartition(預設):以 batching 為機關,通過輪詢将消息均勻發給 brokers,以獲得最大吞吐。
- SinglePartition
- 有 KEY 則寫固定分區,類似
寫到指定分區。hash(key) mod len(partitions)
- 無 KEY 則随機選一個分區,寫入該 producer 的所有消息。
- CustomPartition:使用者可自定義針對具體到消息的分區政策,如 Java 實作
接口。MessageRouter
2.4.2 四種讀下發政策
- Exclusive (預設):獨占消費,一對一,保證有序消費,能批量 ACK,是 Failover 特例,不保證高可用。
- Failover:故障轉移消費,一對一,備選多,保證有序消費,消費者高可用,能批量 ACK,保證高可用。
- Shared:共享消費,多對多
- Round Robin 分發消息,類似 Consumer Group 但不保證有序消費。
- 隻能逐條 ACK:Consumer crash 時才能精确控制消息的重發。
- 水準擴充 Consumer 直接提讀吞吐。不像 kafka 必須先擴 Partition 才能擴 Consumer
- Key_Shared:按 KEY 共享消費,多對多,Exclusive 和 Shared 的折中模式。
- KEY hash 相同的消息會被相同 consumer 消費,保證有序消費。
- 隻能逐條 ACK
- 水準擴充 Consumer 提高讀吞吐。
2.4.3 Pull & Push 可選請求模式
- Consumer 可以同步或異步 p Receive 消息。
- Consumer 可以本地注冊 MessageListener 接口來等待 Broker Push 消息。
2.4.4 Consume ACK 與 unACK
- 逐條 ACK、批量 ACK
- 取消 ACK:consumer 消費出錯可請求重新消費,發送取消 ACK 後 broker 會重發消息。
- exclusive, failover:隻能取消上一次送出的 ACK,單個 consumer 可控復原。
- shared, key_shared:類比 ACK,consumers 隻能取消上一條發出的 ACK
與
__consumer_offsets
機制類似 ,Broker 收到各消費者的 ACK 後,會更新 Consumer 的消費進度 cursor,并持久化到特定的 ledger 中。
2.4.5 Data Retention
- 預設積極保留:最慢的 subscription 堆積的消息都不能被删除,最壞的情況是某個 subscription 下線後,cursor 依舊會保留在 message streaming 中,會導緻消息過期機制失效。
- 消息過期:時間或大小兩個次元設定限制,但隻對積極保留之前的消息生效
- TTL:強制移動舊慢 cursor 到 TTL 時間點,若 TTL == Retention,則與 kafka 一樣強制過期
兩個名額
- Topic Backlog:最慢的 subscription 的 cursor 到最新一條消息之間的消息數量。
- Storage Size:topic 總空間。
- 按 segment 粒度删除,以 Last Motify Time 是否早于 Retention 為标準過期,與 kafka 一緻
- 注:bookie 并非同步過期,空間釋放是背景程序定期清理
3. Bookkeeper 架構
append-only 的分布式 KV 日志系統,K 是
(Ledger_id, Entry_id)
二進制組,V 是
(MetaData, RawData)
二進制資料。
3.1 概念
3.1.1 特性
- 高效寫:append-only 磁盤順序寫。
- 高容錯:通過 bookie ensemble 對日志進行備援複制。
- 高吞吐:直接水準擴充 bookie 提高讀寫吞吐。
3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment
- Ensemble Size:指定一段日志要寫的 bookies 數量。
- Ensembles:指定寫一段日志的目标 bookies 集合。
- Write Quorum:指定一條日志要寫的 bookie 數量。
- ACK Quorum:指定一條日志要确認已寫入的 bookie 數量。
- Segment / Ledger:要寫入的一段日志。
- Fragment:寫入的一條日志。
3.1.3 結論
- Client 會以 Round Robin 的政策挑選出 bookie,依次順延寫 entry
- Client 隻等待 ACK Quorum 個 broker 傳回 Append ACK 就認為寫成功。
- 一個 Segment / Ledger 包含多個 Fragment
- Fragment 内的 entry 呈帶狀連續分布在 Ensembles Bookies 上。
- 一個周期内,一台 Bookie 會存儲不連續的
條 Entry(EnsembleSize - WriteQuorum)
3.2 架構
三個元件
- zk / etcd:強一緻性中繼資料存儲
- 中繼資料存儲:ledger 中繼資料。
- 服務發現:bookie 的注冊中心,bookie 互相發現,client 讀取叢集全部 bookie 位址。
- Bookie:存儲節點,隻允許
/ ADD
兩個操作,不保證一緻性,不保證可用性,功能簡單。READ
- Client:實作備援複制的邏輯,保證資料的一緻性,實作複雜且最重要。
3.3 寫流程
3.3.1 三種檔案
- Journal WAL
- 概念:用于持久化存儲 bookie 操作 ledger 的事務日志,接收來自多個 Ledger Client 寫入的不同 ledger entries,直接 高效地 append 到記憶體,随後 fsync 順序寫磁盤,延遲低。
- 清理:當 Write Cache 完成 Flush 落盤後自動删除。
- Entry Logs
- 概念:真正落盤的日志檔案,有序儲存不同 ledger 的 entries,并維護 Write Cache 加速熱日志的查找。
- 清理:bookie 背景 GC 線程定期檢查其關聯的 ledgers 是否在 zk 上已删除,若已删除則自動清理。
- Index Files
- 概念:高效順序寫的副作用是,必須在外圍維護
到 (ledger_id, entry_id)
的映射索引,才能實作高效讀,故 Flush Cache 時會分離出索引檔案。Entry_Log
- 實作:可選 RocksDB 和檔案存儲索引。
3.3.2 ADD 操作
- Clients 混亂地給 Bookie 發來不同 ledger 的日志。
- Bookie 往追加寫 Journal,同時向 Write Cache 有序寫(Write Cache 内部使用 SkipList 實作動态有序,同時保證讀寫都高效)
- WriteCache 寫滿後 Flush 分離出 index 檔案和落盤的日志檔案。
- 删除舊 Journal,建立新 Journal 繼續追加寫,如此循環。
3.3.3 結論
broker 内部為每個 ledger 持久化了其存儲的 entry logs,并建立索引提高讀效率。
3.4 讀流程
Client 發來
(ledger_id, entry_id)
的 KEY
- 熱 KEY:在 Write Cache 中則直接傳回。
- 冷 KEY:讀取 ledger_id 對應的 index 檔案,根據 index 找出 entry_id 對應的 entry log 再傳回。
3.4.1 讀被均攤
如同輪詢寫,Cleint 也會輪詢 Ensembles 均攤讀取,同樣不存在 leader 讀瓶頸。
3.4.2 讀有預期
若某個 Bookie 讀響應确實很慢,Client 會向其他副本 Bookie 發起讀請求,同時等待,進而保證讀延時低。
3.4.3 讀結果無序
Client 往 bookie 寫是輪詢無序地寫,故從 Ensembles 中讀到是消息是無序的,需在 Client 端自行按 entry_id 重新排序,以保證有序響應。
4. 水準擴容
4.1 水準擴充 Broker
新 Broker 加入叢集後,Broker Leader 會将高負載 Broker 的部分 topic ownership 轉移給新 Broker,進而分攤讀寫壓力。
4.2 水準擴充 Bookie
新 Bookie 加入叢集後,Broker 通過 ZK 感覺到,并将 ledger 的新 entry log 寫到新 Bookie,提高存儲層的讀寫吞吐、存儲容量。
5. Pulsar Consistency
5.1 一緻性機制
日志的備援複制、一緻性保證均由 Bookkeeper Client 實作。
5.1.1 備援副本
由如上的 Eensembles 的 QW 和 QA 的多副本寫,保證每條日志确實持久化到了 bookie 中。
5.1.2 一緻性機制
滑動視窗:
[0, ..., READABLE ... LAC], [LAC+1, ... WAIT_QUOROM ..., LAP]
- LAP(Last Add Pushed):Client 發出的最後一條 entry_id(從 0 自增的正整數)
- LAC(Last Add Confirmed):Client 收到的最後一條 ACK 的 entry_id,是一緻性的邊界。
實作一緻性的三個前置條件:
- 寫 ledger 隻能以 Append-Only 方式追加寫,寫滿後變為 Read-Only
- 一個 Ledger 同一時間隻會有一個 Client 在寫。
- LAC 必須按照 LAP 的順序,依次進行 ACK 确認:保證 LAC 作為一緻性邊界,前邊的日志可讀,後邊的日志等待多副本複制。
5.2 Bookie Auto Recovery:Ensemble Change
5.2.1 場景
bookie crash 下線後,需恢複副本數量。
5.2.2 流程
- 存在 Leader Bookie 5 作為 Daemon Auditor,不斷向其他 Bookies 發送心跳保活。
- Auditor 發現 Bookie 4 逾時,讀取 zk 發現 ledger x 的
entry_id 區間需要從 4 轉移到新 Bookie[0, 7)
- 找出負載較小的 Bookie 6,并根據 Ensembles 發現備援資料分布在
{B1, B2, B3, B5}
- 按輪詢均攤複制讀壓力的方式,将 entry log 逐一複制到 Bookie 6
- 複制完畢後修改 ZK 中繼資料,将 LAC0 的副本 4 替換為 6
5.2.3 結論:Bookie 故障秒級恢複
-
寫請求快速轉移:
Bookie 6 加入 Ensembles 後,直接代替 Bookie 4 繼續 Append 日志。因為副本數恢複是各個 Ensembles 内部各節點的 Auditor 線程背景異步複制,不會導緻 Client 的寫中斷,整個 Recovery 過程對 Client 幾乎透明。
-
LAC 分界線記錄 Ensemble Change 曆史:
在 ZK 的 ledger metadata 中,會記錄每次 Recovery 導緻的 ensembles 更新,即記錄了 ledger 各 entry log 區間的分布情況。
如下中繼資料記錄了 ledger16 在 LAC46 處,Bookie 3183 下線,随後 Bookie 3182 上線從 LAC47 處繼續處理請求:
|
注意:右上可看出 ZK 中各 ledger 的中繼資料寫死了 Bookie 的 IP,容器部署時若 Bookie 重新開機後 IP 變化,會導緻舊 Ledger 的該副本廢棄,故在 k8s 上部署時應選擇 DaemonSet 或 StatefulSet
5.3 Broker Recovery:Fencing
5.3.1 場景
Broker crash,或 Broker 與 ZK 出現網絡分區導緻腦裂,需進行 partition ownership 轉移。
5.3.2 流程
- Broker1 心跳逾時後,ZK 将 topic partition 的 ownership 轉移到 Broker2
- Broker2 向 Ensemble 發起 Fencing ledger_X 請求,Bookies 紛紛将 ledger_X 置為 Fencing 不可寫狀态。
- Broker1 寫資料失敗收到 FenceException,說明該 partition 已被 Broker 接管,主動放棄 ownership
- Client 收到異常後與 Broker1 斷開連接配接,進行 Topic Lookup 與 Broker2 建立長連接配接。
- 同時,Broker2 對 ledger_X LAC1 之後的 entry log 依次逐一進行 Forwarding Recovery(若 unknow 狀态的 entry 副本數實際上已達到 WQ,則認為該 entry 寫成功,LAC1 自增為 LAC2)
- Broker2 更新 ledger_X 的 metadata,将其置為 CLOSE 狀态,再建立新 ledger,繼續處理 Client 的寫請求。
5.3.3 結論:Broker 故障秒級恢複
-
不複用舊 ledger,降低複雜度
若複用舊 ledger_X,必須保證所有 ensemble 的 LAC 一緻,同時涉及尾部 entry 的強一緻複制,邏輯複雜。直接 CLOSE 能保證舊 ledger 不會再被寫入。
-
Recovery 邏輯簡單,耗時短
在 Client 的視角,隻需等待兩個過程:
等待結束後,直接往新 Broker 的新 ledger 上追加寫資料,Broker 不參與任何資料備援複制的流程,是以是無狀态的,可以直接水準擴充提升以提升吞吐。
- ZK 進行 partition ownership 的轉移。
- 新 Broker 對 UNKNOWN 狀态的尾部 entry 進行 Forwarding Recovery
6. Distributed Log 與 Raft
6.1 概念對比
概念 | Raft | DL |
role | Leader 與 Followers | Writer (broker) 與 Bookies |
failover | term | ledger_id |
replication | Majority AppendEntries RPC | Quorum Write |
consistency | Last Committed Index | Last Add Confirmed(LAC) |
brain split | Majority Vote | Broker Fencing |
6.2 流程對比
6.3 總結
- LAC 與 LAP 的存在,使 entry 能以内嵌順序中繼資料的方式,均勻分散存儲到各台 bookie 中。
- DL 與 Raft 不同之處在于:
各 bookie 節點的資料不是從單個節點異步複制而來,而是由 Client 直接輪詢分發。
- 為保證 bookie 能快速 append 日志,bookkeeper 設計了 Journal Append-only 順序寫日志機制。
- 為保證 bookie 能快速根據
讀取消息(lid, eid)
,bookkeeper 設計了 Ledger Store(entry)
是以,各 bookie 存儲節點的身份是平等的,沒有傳統一緻性算法的 Leader 和 Follower 的概念,完美避開了讀寫隻能走 Leader 導緻 Leader 容易成為單點瓶頸的問題。
同時,能直接添加新 Bookie 提升讀寫吞吐,并降低其他舊 Bookie 的負載。
7. 總結
7.1 Pulsar 的優點
直接解決 Kafka 容器平台現有的手工擴容、故障恢複慢的問題。
- 穩定性可用性高:秒級 Broker / Bookie 的快速故障恢複。
- 水準線性擴容:存儲與計算分離,可對 Broker 擴容提升讀寫吞吐,可對 Bookie 擴容降低叢集負載并提升存儲容量。
- 擴容負載均衡:Bookie 擴容後新的 ledger 會在新 Bookie 上建立,自動均攤負載。
7.2 Pulsar 的缺點
- 概念多,系統複雜,隐藏 bug 修複門檻高。
- 背書少,國内僅騰訊金融和智聯招聘在使用。