天天看點

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

作者 | 袁小棟、程君傑

稽核校對 | 杜恒、歲月、白玙、不周

随着各行各業移動互聯和雲計算技術的普及發展,大資料計算已深入人心,最常見的比如 flink、spark 等。這些大資料架構,采用中心化的 Master-Slave 架構,依賴和部署比較重,每個任務也有較大開銷,有較大的使用成本。RocketMQ Streams 着重打造輕量計算引擎,除了消息隊列,無額外依賴,對過濾場景做了大量優化,性能提升 3-5 倍,資源節省 50%-80%。

RocketMQ Streams 适合大資料量 ->高過濾 ->輕視窗計算的場景,核心打造輕資源,高性能優勢,在資源敏感場景中有很大優勢,最低 1core,1g 可部署,建議的應用場景(安全,風控,邊緣計算,消息隊列流計算)。

RocketMQ Streams 相容 Blink(Flink 的阿裡内部版本) 的 SQL,UDF/UDTF/UDAF,多數 Blink 任務可以直接遷移成 RocketMQ Streams 任務。将來還會釋出和 Flink 的融合版本,RocketMQ Streams 可以直接釋出成 Flink 任務,既可以享有 RocketMQ Streams 帶來的高性能,輕資源,還可以和現有的 Flink 任務統一運維和管理。

本篇文章主要從五個方面來介紹 RocketMQ Streams 實時計算平台:

首先簡單先介紹一下什麼是 RocketMQ Streams;

第二部分,基于 RocketMQ Streams 的 SDK,來了解下它是怎麼去使用的;

第三部分,RocketMQ Streams 整體的架構以及它的原理實作;

第四部分,在雲安全的場景下該怎麼使用 RocketMQ Streams;

第五部分,RocketMQ Streams 的未來規劃。

1

什麼是 RocketMQ Streams?

本章節從基礎簡介、設計思路和特點三方面對 RocketMQ streams 進行整體介紹。

RocketMQ Streams 簡介

1)首先,它是一個 Lib 包,啟動即運作,和業務直接內建;

2)然後,它具備 SQL 引擎能力,相容 Blink SQL 文法,相容 Blink UDF/UDTF/UDAF;

3)其次,它包含 ETL 引擎,可以無編碼實作資料的 ETL,過濾和轉存;

4)最後,它基于資料開發 SDK,大量實用元件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的場景。

RocketMQ Streams 的特點

RocketMQ streams 基于上述的實作思路,可以看到它有以下幾個特點:

輕量

1 核 1g 就可以部署,依賴較輕,在測試場景下用 Jar 包直接寫個 main 方法就可以運作,在正式環境下最多依賴消息隊列和存儲(其中存儲是可選的,主要是為了分片切換時的容錯)。

高性能

實作高過濾優化器,包括前置指紋過濾,同源規則自動歸并,hyperscan 加速,表達式指紋等,比優化前性能提升 3-5 倍,資源節省 50% 以上。

維表 JOIN(千萬資料量維表支援)

設計高壓縮記憶體存儲資料,無 java 頭部和對齊的開銷,存儲接近原始資料大小,純記憶體操作,性能最大化,同時對于 Mysql 提供了多線程并發加載,提高加載維表的速度。

高擴充的能力

Source 可按需擴充,已實作:RocketMQ,File,Kafka;

Sink 可按需擴充,已實作:RocketMQ,File,Kafka,Mysql,ES;

可按 Blink 規範擴充 UDF/UDTF/UDAF;

提供了更輕的 UDF/UDTF 擴充能力,不需要任何依賴就可以完成函數的擴充。

提供了豐富的大資料的能力

包括精确計算一次靈活的視窗,雙流 join,統計,開窗,各種轉換過濾,滿足大資料開發的各種場景,支援彈性容錯的能力。

2

RocketMQ Streams 的使用

RocketMQ Streams 對外提供兩種 SDK,一種是 DSL SDK,一種是 SQL SDK,使用者可以按需選擇;DSL SDK 支援實時場景 DSL 語義;SQL SDK 相容 Blink(Flink 的阿裡内部版本) SQL 的文法,多數 Blink SQL 可以通過 RocketMQ Streams 運作;

接下來,我們詳細的介紹一下這兩種 SDK。

環境要求

JDK1.8 版本以上;

Maven 3.2 版本以上。

DSL SDK

利用 DSL SDK 開發實時任務時,需要做如下的一些準備工作:

依賴準備

準備工作完成後,就可以直接開發自己的實時程式。

代碼開發

其中:

1)Namespace 是業務隔離的,相同的業務可以寫成相同的 Namespace。相同的 Namespace 在任務排程裡可以跑在程序裡,也可以共享一些配置;

2)pipelineName 可以了解成就是 job name ,唯一區分 job;

3)DataStreamSource 主要是建立 Source,然後這個程式運作起來,最終的結果就是在原始的消息裡面會加"--",然後把它列印出來。

豐富的算子

RocketMQ streams 提供了豐富的算子, 包括:

source 算子:包括 fromFile, fromRocketMQ, fromKafka 以及可以自定義 source 來源的 from 算子;

sink 算子:包括 toFile, toRocketMQ, toKafka,toDB,toPrint, toES 以及可以自定義 sink 的 to 算子;

action 算子:包括 Filter,Expression,Script,selectFields,Union,forEach,Split,Select,Join,Window 等多個算子。

部署執行

基于 DSL SDK 完成開發,通過下面指令打成 jar 包,執行 jar,或直接執行任務的 main 方法。

SQL SDK

首先開發業務邏輯代碼, 可以儲存為檔案也可以直接使用文本;

其中

CREATE FUNCTION:引入外部的函數來支援業務邏輯, 包括 flink 以及系統函數;

CREATE Table:建立 source/sink;

CREATE VIEW:執行字段轉化,拆分,過濾;

INSERT INTO:資料寫入 sink;

函數:内置函數,udf 函數。

SQL 擴充

RocketMQ streams 支援三種 SQL 擴充能力,具體實作細節請看:https://github.com/alibaba/rsqldb

1)通過 Blink UDF/UDTF/UDAF 擴充 SQL 能力;

2)通過 RocketMQ streams 擴充 SQL 能力,隻要實作函數名是 eval 的 java bean 即可;

3)通過現有 java 代碼擴充 SQL 能力,create function 函數名就是 java 類的方法名。

SQL 執行

你可以從這裡下載下傳最新的 Rocketmq Streams 代碼并建構。

解壓 tar.gz 包, 進入目錄結構

其目錄結構如下

bin 指令目錄,包括啟動和停止指令

conf 配置目錄,包括日志配置以及應用的相關配置檔案

jobs 存放 sql,可以兩級目錄存儲

ext 存放擴充的 UDF/UDTF/UDAF/Source/Sink

lib 依賴包目錄

log 日志目錄

執行 SQL

執行多個 SQL

如果想批量執行一批 SQL,可以把 SQL 放到 jobs 目錄,最多可以有兩層,把 sql 放到對應目錄中,通過 start 指定子目錄或 sql 執行任務。

任務停止

日志檢視

目前所有的運作日志都會存儲在 log/catalina.out 檔案中。

3

架構設計及原理分析

RocketMQ Streams 設計思路

在了解完 RocketMQ streams 的基本簡介,接下來,我們看下 RocketMQ streams 的設計思路,設計思路主要從設計目标和政策兩個方面來介紹:

設計目标

依賴少,部署簡單,1 核 1g 單執行個體可部署,可随意擴充規模;

打造場景優勢,重點打造大資料量 ->高過濾 ->輕視窗計算的場景,功能覆寫度要全,實作需要的大資料特性:Exactly-ONCE、靈活的視窗(滾動、滑動、會話視窗);

要在保持低資源的前提下,對高過濾有性能突破,打造性能優勢;

相容 Blink SQL,UDF/UDTF/UDAF,讓非技術人員更容易上手。

政策(适配場景:大資料量>高過濾 /ETL>低視窗計算)

采用 shared-nothing 的分布式架構設計,依賴消息隊列做負載均衡和容錯機制,單執行個體可啟動,增加執行個體實作能力擴充,并發能力取決于分片數;

利用消息隊列的分片做 shuffle,利用消息隊列負載均衡實作容錯;

利用存儲實作狀态備份,實作 Exactly-ONCE 的語義。用結構化遠端存儲實作快速啟動,不等本地存儲恢複。

重力打造過濾優化器,通過前置指紋過濾,同源規則自動歸并,hyperscan 加速,表達式指紋提高過濾性能

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

RocketMQ Streams Source 的實作

1)Source 要求實作最少消費一次的語義,系統通過 checkpoint 系統消息實作,在送出 offset 前發送 checkpoint 消息,通知所有算子重新整理記憶體。

2)Source 支援分片的自動負載和容錯

資料源在分片移除時,發送移除系統消息,讓算子完成分片清理工作;

當有新分片時,發送新增分片消息,讓算子完成分片初始化。

3)資料源通過 start 方法,啟動 consuemr 擷取消息;

4)原始消息經過編碼,附加頭部資訊包裝成 Message 投遞給後續算子。

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

RocketMQ Streams Sink 的實作

1)Sink 是實時性和吞吐的一個結合;

2)實作一個 sink 隻要繼承 AbstractSink 類實作 batchInsert 方法即可。batchInsert 的含義是一批資料寫入存儲,需要子類調用存儲接口實作,盡量應用存儲的批處理接口,提高吞吐;

3)正常的使用方式是寫 message->cache->flush->存儲的方式,系統會嚴格保證每次批次寫入存儲的量不超過 batchsize 的量,如果超過了,會拆分成多批寫入;

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

4)Sink 有一個 cache,資料預設寫 cache,批次寫入存儲,提高吞吐(一個分片一個 cache);

5)可以開啟自動重新整理,每個分片會有一個線程,定時重新整理 cache 資料到存儲,提高實時性。實作類:DataSourceAutoFlushTask;

6)通過調用 flush 方法重新整理 cache 到存儲;

7)Sink 的 cache 會有記憶體保護,當 cache 的消息條數>batchSize,會強制重新整理,釋放記憶體。

RocketMQ Streams Exactly-ONCE 實作

1)Source 確定在 commit offset 時,會發送 checkpoint 系統消息,收到消息的元件會完成存盤操作,消息至少消費一次;

2)每條消息會有消息頭部,裡面封裝了 queueld 和 offset;

2)元件在存儲資料時,會把 queueld 和處理的最大 offset 存儲下來,當有消息重複時,根據 maxoffset 去重;

3)記憶體保護,一個 checkpoint 周期可能有多次 flush(條數觸發),保障記憶體占用可控。

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

RocketMQ Streams Window

實作方式:

1)支援滾動、滑動和會話視窗,支援事件時間和自然時間(消息進入算子的時間);

2)支援 Emit 文法,可以在觸發前或觸發後,每隔 n 段時間,更新一次資料;比如 1 小時視窗,視窗觸發前希望每分鐘看到最新結果,視窗觸發後希望不丢失遲到一天内的資料,且每 10 分鐘更新資料。

3)支援高性能模式和高可靠模式,高性能模式不依賴遠端存儲,但在分片切換時,有丢失窗資料的風險;

4)快速啟動,無需等待本地存儲恢複,在發生錯誤或分片切換時,異步從遠端存儲恢複資料,同時直接通路遠端存儲計算;

5)利用消息隊列負載均衡,實作擴容縮容容,每個 queue 是一份組,一個分組同一刻隻被一台機器消費;

6)正常計算依賴本地存儲,具備 flink 相似的計算性能。

4

RocketMQ Streams 在安全場景的最佳實踐

背景

從公共雲轉戰專有雲,遇到了新的問題。因為專有雲像大資料這種 saas 服務是非必須輸出的,且最小輸出規模也比較大,使用者成本會增加很多,難落地,導緻安全能力無法快速同步到專有雲。

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

解決辦法

RocketMQ Streams 在雲安全的應用 - 流計算

基于安全場景打造輕量級計算引擎,基于安全高過濾的場景特點,可以針對高過濾場景優化,然後再做較重的統計、視窗、join 操作,因為過濾率比較高,可以用更輕的方案實作統計和 join 操作;

SQL 和引擎都可熱更新

RocketMQ Streams:将輕量級實時計算引擎融合進消息系統

業務結果

1)規則覆寫:自建引擎,覆寫 100% 規則(正則,join,統計);

2)輕資源,記憶體是公共雲引擎的 1/24,cpu 是 1/6,依賴過濾優化器,資源不随規則線性增加,新增規則無資源壓力,通過高壓縮表,支援千萬情報;

3)SQL 釋出,通過 c/s 部署模式,SQL 引擎熱釋出,尤其護網場景,可快速上線規則;

4)性能優化,對核心元件進行專題性能優化,保持高性能,每執行個體(2g,4 核,41 規則)5000qps 以上。

5

RocketMQ Streams 的未來規劃

打造 RocketMQ 一體化計算能力

1)和 RocketMQ 整合,去除 DB 依賴,融合 RocketMQ KV;

2)和 RocketMQ 混部,支援本地計算,利用本地特點,打造高性能;

3)打造邊緣計算最佳實踐

Connector 增強

1)支援 pull 消費方式,checkpoint 異步重新整理;

2)相容 blink/flink connector。

ETL 能力建設

1)增加檔案,syslog 的資料接入能力

2)相容 Grok 解析,增加常用日志的解析能力;

3)打造日志 ETL 的最佳實踐

穩定性和易用性打造

1)Window 多場景測試,提升穩定性,性能優化;

2)補充測試用例,文檔,應用場景。

6

開源位址

RocketMQ-Streams:https://github.com/apache/rocketmq-streams

RocketMQ-Streams-SQL:https://github.com/alibaba/rsqldb

以上是本次對 RocketMQ stream 的整體介紹,希望對大家有所幫助和啟發。

繼續閱讀