天天看點

Apache Pulsar 在能源網際網路領域的落地實踐

關于 Apache Pulsar

Apache Pulsar 是 Apache 軟體基金會***項目,是下一代雲原生分布式消息流平台,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支援多租戶、持久化存儲、多機房跨區域資料複制,具有強一緻性、高吞吐、低延時及高可擴充性等流資料存儲特性。

GitHub 位址:http://github.com/apache/pulsar/

案例導讀:本案例介紹了清華大學能源網際網路創新研究院将 Apache Pulsar 落地能源網際網路方向的實踐。Pulsar 的雲原生架構、Schema、Functions 等特性滿足了相關業務需求,也減輕了他們開發和運維負擔。

閱讀本文需要大約 8 分鐘。

團隊及業務簡介

能源網際網路是電力與能源工業發展的方向。随着資訊、通信和網際網路技術的飛速發展,可擷取的資料量正以爆炸式方式迅猛增長,傳統的資料處理方法已難以應對這些海量且增長極快的資訊資産,大資料理論正是在這樣的狀态下應運而生。大資料處理技術能幫助我們透過海量資料快速分辨其運作狀态及發展趨勢,在紛繁的世界中獨具洞察力。

清華大學能源網際網路創新研究院能源大資料與開放生态研究中心彙集了國内外能源及電力大資料領域的多位專家,緻力于推動大資料基礎理論和實踐應用的全面創新。能源大資料與開放生态研究中心将大資料技術應用于能源網際網路、智能電網和智慧用能等工程場景,結合高性能優化、并行計算和人工智能等先進技術,研發适用于能源電力行業特點的大資料 / 雲計算平台,和基于資料驅動的能源電力系統的進階應用,進而實作大資料産業的發展,形成以資料為核心的新型産業鍊,推動我國能源産業的轉型與更新。

挑戰

我們團隊的業務主要是與電力相關的物聯網場景,旨在實作使用者對傳感器等裝置資料的需求開發。我們團隊規模較小,但任務繁雜,希望能更快更穩地實作客戶的需求。

在整理業務需求後,我們提出以後端即服務(BaaS)為主、基于消息的服務方案。在物聯網領域内,基于這樣的解決方案,我們可以共用更多基礎設施服務,同時可以快速應對不同需求進行業務開發。考慮到特殊的業務需求,我們的平台需要具備以下特性:

  • 多租戶:平台要實作業務分離,服務不分離,又可以確定安全稽核,滿足客戶對資料安全性的敏感需求,就必須支援多租戶。此外,還可以在通訊、資料、業務這三方面提供一些基礎服務,比如自定義資料結構的 Schema Registry,自定義資料歸屬的 ACL 權限管理(增加删改的 API 接口),以及實作各種業務的自定義函數引擎。
  • Schema Registry:滿足不同需求和應用場景下裝置多變的資料結構,提供允許自定義資料結構的 Schema Registry。
  • 通用 API:提供包含增加删改的 HTTP RESTful APIs 和相應的 WebSocket 接口,確定在通訊上提供基礎服務,并基于這一基礎服務進行擴充。
  • ACL 權限管理:可自定義資料的 ACL 權限控制服務,保障資料安全。
  • 時序資料庫:多數情況下,物聯網場景都在和時序資料打交道,是以我們選擇了基于 PostgreSQL 的開源 TimeScaleDB,并且依托 TimeScaleDB 做了一系列時序資料的聚合查詢接口。
  • 使用者自定義 functions:實作各種業務的自定義函數引擎。

之前我們使用基于 RabbitMQ 和 Celery 的方案來實作使用者自定義 functions 的函數引擎。這一方案的最初使用效果良好,但随着業務的增長,問題越來越多。我們的小團隊不得不花更多時間來解決問題和優化整體方案。當 Celery 作為任務隊列時,這些問題尤為嚴重。

我們花費大量的時間和精力處理的問題主要有兩個:

  • 需要仔細配置 Celery 的 worker 和 task,避免執行時間長的任務阻塞其他任務;
  • Worker 更新時需要中斷服務,更新時間也相對較長。

此外,在特殊場景中,如果單個消息比較大且消息處理時間長時,Celery 和 RabbitMQ 的記憶體負擔都比較大。

随着客戶數量和項目數量的增加,這些問題變得日益突出,我們決定找一個新産品替代原有方案。

為什麼選擇 Apache Pulsar?

如上所述,我們希望消息中間件可以提供以下特性:

  • 多租戶
  • 可靠性和高可用
  • 支援多協定,尤其可以很友善地轉換協定:在物聯網領域,我們需要應對不同的通信協定,把不同通信協定的資料全部導入到消息中間件中。
  • 支援多語言:我們團隊主要使用 Go 語言,但我們會和很多使用其他語言的團隊合作,是以消息中間件最好可以支援其他語言。
  • 作為輕量級計算引擎實作簡單的消息處理。

在調研不同的消息中間件時,我們很快發現了 Pulsar。通過 Pulsar 的文檔和釋出日志,我們了解到 Pulsar 有很多優秀的特性,是以決定對 Pulsar 進行測試和評估。經過深入研究、學習,我們發現 Pulsar 的雲原生架構、Schema、Functions 等非常适合我們的業務需求。

  • 雲原生:Pulsar 支援雲原生,擁有諸多優秀的特性,如計算與存儲分離,可以很好地利用雲的彈性伸縮能力,保證擴容和容錯。此外,Pulsar 對 Kubernetes 的良好支援也在一定程度上幫助我們将一部分業務輕松遷移到了 Kubernetes 上。
  • Pulsar Functions:Pulsar Functions 是一個優秀的輕量級計算引擎,可以很好地取代 Celery 方案。我們可以更多地嘗試使用 Pulsar Functions 來處理業務,這是我們選擇 Pulsar 的主要原因。
  • 分層存儲:這一特性能夠節約存儲成本。我們的使用場景會産生很多傳感器的原始資料,需要作為冷資料存儲。借助分層存儲,我們可以直接将這些冷資料存儲在價格更低的存儲服務中,也無需開發額外的服務來存儲資料。
  • MQTT/MoP:Pulsar 對各種協定的相容展示了社群的開放性。在 MoP 釋出前,我們開發了 MQTT 協定的轉發工具,把 MQTT 協定上的資料轉發到 Pulsar 中。
  • Pulsar Schema:我們的平台通過 JSON 來描述資料 schema,通過對接 Pulsar Schema 和我們自己的 Schema Registry,可以實作消息序列化的工作。目前 Pulsar 在 Go Schema 的功能仍處于起步階段,我們也會嘗試做一些實踐與貢獻。
  • 多語言:我們很看重多語言支援,尤其是 Go 語言。Pulsar 有 Go 語言相應的用戶端、Go function runtime、基于 Go 語言實作的 Pulsarctl 等。我們也希望 Pulsar 未來可以支援更多語言,因為我們不能預見客戶的需求,支援多語言能夠幫助我們更輕松地解決問題。
  • Pulsar Manager & Dashboard:Pulsar 在各個層級都啟動了接口來擷取 Metrics。Pulsar 的其他工具(如 Prometheus、Grafana、Pulsar Manager)能夠幫助我們減輕運維、優化、排錯的投入。
  • 開源:Pulsar 社群開放、活躍、友好。有 StreamNative 這樣的公司做支撐,使用者可以放心地選擇 Pulsar,把業務遷移到 Pulsar 上。

深入了解 Pulsar 後,我們決定對 Pulsar 進行測試,并嘗試遷移一個生産環境的應用。

遷移試驗:樓宇智慧用電

樓宇智慧用電是我們在用電分析和預測領域做的一次嘗試,我們希望采集到辦公室中每一個用電點的用電資訊。在研究院新辦公樓裝修初期,我們進行了技術評估,将使用 zigbee 協定的智能插座列入了裝修方案。整個部署包含三層樓,約 700 個智能插座和 50 個 zigbee 網關。插座部署在辦公場所的所有用電點,包含工位插座、牆壁插座以及中間空調風機插座。所有資料通過智能插座廠商提供的區域網路廣播方案,将廣播資料轉發到 Pulsar 中實作資料點的采集和預處理。目前用電量資料每 10 秒鐘上送一次,其他與使用者相關的操作(包括開關插座、插拔用電裝置)則實時上送。針對這些資料,我們做了一些資料可視化的嘗試,并把資料貢獻給研究院的其他團隊進行分析,或用作開發算法的參考資訊和原始資料。

基于智能插座裝置廠商提供的 MQTT 方案,我們嘗試将 MQTT 協定的資料都轉發到 Pulsar 中。在轉發過程中,我們遇到的主要問題是 MQTT topic 和 Pulsar topic 的映射。我們的解決方案是直接把所有的 MQTT 資料轉發到同一個 Pulsar topic 中,同時把部分中繼資料包裝在轉發的消息中,再通過 Pulsar Functions 做消息路由,把消息轉發到不同的業務 topic 中。下圖展示了如何将傳感器産生的資料傳送至平台并最終入庫。

Apache Pulsar 在能源網際網路領域的落地實踐

在從 MQTT 轉發資料到 Pulsar 的過程中,我們預設把所有裝置的資料都轉發到同一個 topic 中,并通過 verificate function 進行驗證(包括解密和内容檢查),保障資料的合法性。合法的資料會被轉發到一個中間 topic 等待消息路由分發,消息分發的 function 會從資料中解析出裝置類型和消息類型,再轉發到對應業務 topic 中,等待被對應業務 topic 綁定的 ETL function 做處理。在使用 ETL function 處理時,我們也會根據裝置類型提取不同的資料,對網關裝置提取網關狀态、裝置資訊,對插座提取用電資料和插座的狀态資訊。這些資訊會比對我們平台的 Schema Registry 資料結構,我們再把生成的資料做 Schema Mapping(通過 Functions 實作),最後統一轉發這些結構化的資料到 sink topic 中,由 sink function 寫入到資料庫。

樓宇智慧用電的遷移測試有力驗證了 Pulsar 符合我們的需求。在遷移過程中,我們查閱了 Pulsar 文檔,從社群獲得了大力支援和幫助,遷移過程高效、順利。借助 Functions 的開放與便利,我們很快完成了流程圖中所有 function 的開發和調試,上線了整個業務系統。

在業務遷移過程中,Pulsar 運作狀态良好,團隊一緻認為 Pulsar 可以幫助我們減輕開發和運維負擔,是以我們選擇 Pulsar 作為研究中心唯一的消息中間件服務,我們的小團隊也開始跟随 Pulsar 一起進行一系列雲原生遷移和優化工作。

決定方案後,我們将 Apache Pulsar 進一步應用到電網智能傳感和智能變電所的場景,這些場景都與物聯網、能源和電力相關。下文将詳細介紹我們如何使用 Pulsar 和 Pulsar Functions,以及如何通過 Pulsar Functions 簡化傳感器資料流的相關處理。

Pulsar x 電網智能傳感

電網智能傳感場景主要基于清華大學能源網際網路創新研究院與電網公司合作的輸電線路智能多參數傳感器內建研究項目。該項目的傳感器來自不同的廠家,分布在輸電線路的各個位置,傳感器類型是以也不盡相同,包括杆塔、杆塔上、輸電線路側等十多種。整個系統目前接入總長度約六百公裡,包含六百多個杆塔的輸電線路傳感器。這一場景主要負責對各種傳感器的資料進行線上監測和告警,同時,我們也單獨針對電壓傳感器做了暫态電壓分析。

這個應用場景有兩個難點:一是來自不同廠商的傳感器沒有統一的通信協定,有的使用電力相關的 IEC104 規約,有的使用 protobuf 或其他廠商自定義協定;二是項目資料量比較大,有些傳感器可能會單次産生 20 MB 甚至更大的消息,有些傳感器則每秒上傳一次資料。

借助 Pulsar,我們選擇在 producer 端不做任何資料處理,直接将資料轉發到 Pulsar 中,再通過 Pulsar Functions 做進一步的資料預處理和其他業務操作。以電壓傳感器為例,電壓傳感器會産生三類資料,分别是心跳資料、穩态波形資料和暫态波形資料。其中心跳資料和穩态波形資料通過 protobuf 協定傳輸,暫态資料則通過 zip 壓縮檔案的形式傳輸。接收到 protobuf 的資料後,借助 Pulsar Functions 進行一系列的資料處理,包括通過解密 function 完成資料解密和 protobuf 的反序列化,再對資料進行路由,通過對應的 ETL function 做資料處理和解析,最後通過 Schema Mapping 将資料入庫。我們把這個流程的每一步都封裝成獨立的 Pulsar function,這樣做出于三點考慮:

  • 我們希望監控到整個資料流過程中每一個環節的狀态,采集每個過程的 metrics,并且觀測一些重點名額,比如是否存在 backlog 積壓。狀态監測友善我們調整每個環節 function 的并行數量。
  • 使整個資料流更加靈活,便于我們在不同流程中新增和删除 function。
  • 更大程度地保障了我們可以重用自己維護的 function。

這個方案也遇到了一些小困難,比如由于 function 比較多,我們需要花更多時間部署、維護每一個過程的中間 topic。目前,我們的解決方案是直接寫對應的代碼一次性完成部署和維護。雖然需要投入更多精力,但我們認為這種 function 的開發和部署模式是值得的。上文提到電壓傳感器除了會産生 protobuf 的兩種資料外,還會産生一種暫态資料。暫态資料一般在電網發生故障或異常時産生,類似電力系統的快照,記錄故障發生前到發生時,再到發生後的波形狀态。在電力系統中,暫态資料通常有标準的存儲方案和特定的解析接口。相對于傳感器産生的其他資料來說,這類資料的特點是比較大,動辄幾十兆。我們應對暫态資料的方案是先解壓縮這些資料,再分析資料檔案。這裡我們借助了 Pulsar Functions 多語言支援的特性,流程圖中的藍色部分使用 Go function 實作,黃色部分使用 Python 實作,Python 有一個解析電網暫态資料的庫,可以調用,就免去了我們自己花時間實作一套 Go 版本解析接口的工作。

Apache Pulsar 在能源網際網路領域的落地實踐

Pulsar x 智能變電所

智能變電所是我們在變電系統中變電環節的一些嘗試,這個項目基于我們合作的智能輸變電裝置廠商,希望基于開關櫃等變電所裝置實作變電所的資料接入。這個項目的主要目标是實作實時監測、故障診斷和異常監測這三大功能。

Apache Pulsar 在能源網際網路領域的落地實踐

在智能變電所的場景中,通常由裝置生産廠商提供裝置的故障診斷算法或診斷應用,我們需要将不同性質的算法或應用內建到現有方案中。客戶提供的算法可能直接在 Pulsar Functions 中調用,也可能是已經編譯好的可執行檔案,甚至可能是其他語言的實作,比如 R 語言。針對這一系列問題,我們先把客戶提供的實作封裝在 Docker 容器中,在容器中實作一個最小的 Pulsar function runtime,再通過 Docker proxy function 和 Docker endpoint 溝通,在觸發 function 時建立對應算法的容器實作計算,最後将結果回傳到 Pulsar 對應的 topic 中。

另外,在這一場景中我們也遇到了一些應用層面的需求,比如消息推送。我們借助 Pulsar Functions 實作了一些業務功能,在 Functions 中可以很友善地調用不同服務商的接口,實作消息推送,比如短信、郵件、應用程式的推送服務。此外,通過 Pulsar Functions,我們得以把消息推送的業務需求從平台中解藕出來,把服務做成 function,便于後續在有同樣需求的場景中直接使用。

使用 Pulsar 遇到的問題及解決方案

我們在使用 Pulsar 的過程中遇到了一些問題,下文會分享解決這些問題的一些經驗,希望可以對準備或者已經在使用 Pulsar 的同學提供一些幫助。

第一個是關于 Pulsar 預設消息大小的問題。在預設配置下,Pulsar 支援的最大消息是 5 MB,在上文提到的智慧電網案例中,單條消息有時會超過 20 MB。我們根據文檔修改了 broker 配置檔案中的

MaxMessageSize

參數,但修改的配置并沒有生效,超過 5 MB 的消息依然不能正常傳遞到 Pulsar 中。于是我們在 Pulsar 社群尋求幫助,得到了社群的迅速回應。這個問題的主要原因是 Pulsar 2.4.0 中

MaxMessageSize

沒有同步到 BookKeeper,是以即使 broker 可以接收更大的消息,broker 仍然不能把消息傳遞到負責存儲的 BookKeeper 中。是以除了修改

MaxMessageSize

值外,還需要修改 broker 和 BookKeeper 中

nettyFrameSizeBytes

相關配置,這些配置保持一緻,Pulsar 就可以處理更大的單條消息。

第二個問題是我們在使用 Pulsar Functions 處理資料時,topic 中可能會出現 backlog 積壓越來越多的情況。Backlog 包括沒有發送給 Functions(consumer)的資料,也包括已發送但未被 Functions(consumer)ack 的資料。根據我們的經驗,在 Functions 場景下,消息積壓可能是因為 function 處理單條消息的速度慢,處理時間長,或者 function 崩潰。如果是因為 function 處理消息慢,一種解決方案是增加 function 的并行數量,再具體分析執行速度慢的原因并進行優化;另一種方案是把複雜的 function 分成多個簡單的 function,也就是在智能電網場景中提到的把一個複雜的 function 拆成多個 function,通過 function 的鍊式模式把整個流程連結起來。這樣我們可以很友善地觀測每一個 function 的狀态,也可以針對某個 function 做進一步的優化。如果由于 function 崩潰造成 backlog 積壓,則需要保障 function 的穩定性,并借助 function 的 log topic 進行調試。

第三個問題是當 producer 數量增加時,很難統一管理和觀測每個 producer 的狀态,即 producer 與 broker 之間的通信狀态和 producer 與資料源之間的通信狀态。針對這個問題,我們目前的解決方案是給 producer 增加心跳消息到對應的心跳 topic 做整體監控,同時,監控 producer 和 broker 的狀态連接配接。通過這些改動,我們可以較好地聚合觀測 producer 的運作狀态。我們注意到 GitHub 上也在讨論類似問題,期待和社群一起提出更優秀的解決方案。

期待

我們期待 Pulsar 能改善或增加以下功能。

  • Pulsar Functions Mesh 實作了對 function 進行類似于 Kubernetes 的服務編排,我們期待該功能的釋出。上文提到我們實作了鍊式 function 的解決方案,但這種方式在維護上遇到很大挑戰,希望 Functions mesh 可以解決這個問題。
  • 希望 Pulsar functions 支援更多語言的 runtime。我們用 function 做 Docker proxy function,這個方案雖然可行,但希望有更優秀的解決方案。
  • IoT 場景很注重邊緣計算,我們希望 Pulsar 可以在邊緣計算上做一些嘗試。我們關注到 Pulsar 允許将 Functions 的消息推送到另一個 Pulsar 叢集中,允許 Functions 與外部 Pulsar 叢集通訊。通過這一改動,可以嘗試将 Pulsar 部署到邊緣裝置上,并使用 Pulsar Functions 在這些裝置上進行計算。部署 Pulsar 對記憶體的需求較大,在一些運算能力較弱的邊緣裝置上部署 Pulsar 比較困難,希望 Pulsar 能在後續版本中優化或提供其他方案解決這一困擾。

結語

作為一個開源項目,Pulsar 正在快速發展,文檔更新迅速,社群響應及時,社群規模不斷壯大。我們希望深入了解 Pulsar,參與 Pulsar 開發貢獻,和社群分享我們的實踐經驗,與 Pulsar 社群共同發展。

在使用 Pulsar 的過程中,我們遇到一些困惑,感謝 StreamNative 團隊小夥伴們的大力支援,幫助我們順利将 Pulsar 應用到上述業務場景中。未來,我們會積極嘗試 Pulsar 的各種新功能,并将 Pulsar 應用于更多的能源網際網路場景中。

作者簡介

胡軍,清華大學電機系副教授,清華大學能源網際網路創新研究院能源大資料與開放生态研究中心執行主任,IEEE Member,CIGRE Member。

相關閱讀

  • 案例 | Apache Pulsar 助力江蘇移動重塑 5G 時代計費支撐系統
  • 案例 | Apache Pulsar 在騰訊 Angel PowerFL 聯邦學習平台上的實踐
  • Apache Pulsar 在 BIGO 的性能調優實戰(上)
  • 新功能詳解:Pulsar Function Mesh