天天看點

譯文 | 深度剖析 Pulsar Functions

原文作者 Sanjeev Kulkarni,翻譯 Sijia@StreamNative

關于 Apache Pulsar

Apache Pulsar 是 Apache 軟體基金會頂級項目,是下一代雲原生分布式消息流平台,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支援多租戶、持久化存儲、多機房跨區域資料複制,具有強一緻性、高吞吐、低延時及高可擴充性等流資料存儲特性。

GitHub 位址:http://github.com/apache/pulsar/

Pulsar Functions 是開源資料技術架構 Apache Pulsar[1] 為輕量級計算提供的内置流處理器。在 2020 年 Pulsar Summit 會議[2]上,我發表了一次關于 Pulsar Functions 的演講。本文将深入讨論 Functions 的架構和實作細節。

Pulsar Functions 簡介

Pulsar Functions 是 Pulsar 消息系統的核心計算基礎結構。使用 Pulsar Functions,無需部署單獨的系統(如 Apache Storm、Apache Heron),即可基于單條消息建立複雜的處理邏輯,簡化事件流并引入無服務架構。

輕量級計算 function 從一個或多個 Pulsar topic 消費消息,将使用者提供的處理邏輯應用于每條消息,并釋出計算結果到其他 topic。由于不需要外部處理系統,Pulsar Functions 不僅使應用程式開發更便捷,還簡化了故障排除操作,減輕了運維負擔。

另外,開發人員可以直接使用 Pulsar Functions 的 API。了解 Java 語言的程式員可以直接使用 Java SDK 編寫 function。示例如下:

import java.util.function.Function; public class ExclamationFunction implements Function<String, String>  @Override   public String apply(String input) {       return input + "!";   }}           

Pulsar Functions 旨在借助簡單的 API 和執行架構處理常見的流使用場景(如過濾、路由、擴充),而不是替換重量級流處理引擎(如 Spark、Flink)。

使用者可以自行編寫 Pulsar function,送出到 Pulsar 叢集後,即可與 Pulsar Functions 的内置管理部件一起使用。使用基于 CRUD 的 REST API,使用者可以從任一工作流程送出 function,送出後即可運作。

送出流程

送出 function 的流程稱為 Function Representation。送出流程的結構稱為 FunctionConfig,包括租戶、命名空間和名稱。Function 通過送出 JAR 或 Python 檔案消費輸入和輸出資料、使用者配置、私密管理支援等。另外,使用者可以同時運作多個 function。

public class FunctionConfig {        private String tenant;        private String namespace;        private String name;        private String className;        private Collection<String> inputs;        private String output;        private ProcessingGuarantees processingGuarantees;        private Map<String, Object> userConfig;        private Map<String, Object> secrets;        private Integer parallelism;        private Resources resources;        ... }           

使用者送出 function 後,系統将會對 function 進行“送出檢查”或“驗證”,確定使用者有權限送出此 function 到特定的命名空間和租戶。如果使用 Java 語言,送出時就會加載這些類,確定指定的類在 JAR 檔案中。所有的檢查都會在送出時立刻進行,是以出現錯誤時,使用者可以很快收到提示消息,而不用自行檢視錯誤日志。

下一步是複制代碼到 BookKeeper。将送出代碼中的所有參數以協定緩沖結構表示為 FunctionMetaData,示例如下:

message FunctionMetaData {    FunctionDetails functionDetails ;    PackageLocationMetaData packageLocation;    uint64 version ;    uint64 createTime;    map<int32 , FunctionState> instanceStates ;    FunctionAuthenticationSpec functionAuthSpec ;}           

Function MetaData Manager 負責管理 FunctionMetaData 結構。從 worker 的角度來看,Function MetaData Manager 負責維護系統記錄。Function MetaData Manager 将完全限定 Function 名稱(Fully Qualified Function Name,FQFN)映射到由 Pulsar topic 命名空間和 function 資訊構成的 Function MetaData;基于送出内容更新、管理計算機狀态,當送出多個 worker 時,檢查沖突并将中繼資料寫入 topic。

排程流程

系統接收 function 送出後,使用可插拔排程程式進行排程。如果 function 在送出後僅由 leader 執行,就會激活排程程式。通過災備方式訂閱協調 topic 即可确定 leader,即 topic 中的 consumer。

Leader 将任務寫入到任務 Topic(Assignment Topic)。任務 Topic 儲存在 Pulsar 特定的命名空間中,被配置設定給各個 worker。Worker 有權限檢視所有已壓縮并包含所有系統邏輯的任務,如任務 Table(Assignment Table)中的 FQFN、執行個體 ID。

執行流程

更改任務 Table 即可觸發執行流程。Worker 的部件(Function RunTime Manager)負責管理 function 周期任務,如使用 Spawner 啟動或停止消息。

Java 執行個體和 Pulsar IO

Pulsar Java 執行個體被封裝為 Source、function(實際邏輯),也可以說是 Sink ensemble。Source 從 input topic 讀資料,而 Sink 從 topic 讀資料。

在使用内置 function 時,“Source”即為 Pulsar Source,從 Pulsar 讀資料;“Sink”即為 Pulsar Sink,向 Pulsar topic 寫資料。

但如果送出類似于 Google 釋出/訂閱的 Source(非 Pulsar Source),借助 Pulsar IO 成為 connector,且實際功能類似于 Pulsar Function,則此 function 為特性 Function(Identity Function),可用于處理資料。送出後,Pulsar Sink 向 topic 寫入此 function;非 Pulsar Sink 則向外部系統寫入。正因為 Pulsar IO 基于 Pulsar Functions 編寫,Pulsar 才得以從外部系統讀資料。

開始使用 Pulsar Functions

相關閱讀

引用連結

繼續閱讀