eKuiper 項目填補了邊緣端實時計算的空白,在工業物聯網、車聯網等領域得到了越來越廣泛的應用。根據 GitHub、微信群、論壇等多個管道收集到的大量使用者回報,我們對 eKuiper 的易用性和可靠性進行了持續提升,并于近日正式釋出了 1.5.0 版本。
本次更新主要亮點有:
- SQL 改進:用于定義資料流和分析規則的 eKuiper 核心功能 SQL 提供了包括變化監測函數以及 object_construct 函數等在内的更多内置函數,提升了表達能力;
- 生态連接配接:提供内置的 Neuron 連接配接支援,可以輕松處理 Neuron 生态中的資料;同時通用的 SQL 插件可以接入多種傳統 SQL 資料庫,一定程度上實作批資料的流式處理。
- 運維和文檔改進:規則運作時穩定性提升,支援按需編譯。文檔的導航結構重構,閱讀體驗和查詢效果提升。
社群站網址:https://ekuiper.org/zh
GitHub 倉庫:https://github.com/lf-edge/ekuiper
Docker
鏡像位址:https://hub.docker.com/r/lfedge/ekuiper
生态連接配接
各種 source/sink 是 eKuiper 連接配接資料處理生态的途徑。新版本中,eKuiper 添加了更多的連接配接類型 Neuron 和 SQL。同時我們也改進了原有連接配接,例如 TDEngine sink 中添加了超級表的支援。
Neuron 整合
Neuron 是一個EMQ 發起并開源的工業物聯網(IIoT)邊緣工業協定網關軟體,用于現代大資料技術,以發揮工業 4.0 的力量。它支援對多種工業協定的一站式通路,并将其轉換為标準 MQTT 協定以通路工業物聯網平台。Neuron 和 eKuiper 整合使用,可以友善地進行 IIoT 邊緣資料采集和計算。
在之前的版本中,Neuron 與 eKuiper 之間需要采用 MQTT 作為中轉。二者協同時,需要額外部署 MQTT Broker。同時,使用者需要自行處理資料格式,包括讀入和輸出時的解碼編碼工作。Neruon 2.0 版本與 eKuiper 1.5.0 版本将無縫整合,使用者無需配置即可在 eKuiper 中接入 Neruon 中采集到的資料,進行計算;也可以友善地從 eKuiper 中反控 Neuron 。兩個産品的整合,可以顯著降低邊緣計算解決方案的部署成本,簡化使用門檻。使用 NNG 協定,使用程序間通信,也可顯著降低網絡通信消耗,提高性能。
使用者接入 Neuron 時,隻需要在 eKuiper 中建立一個類型為 Neuron 的流:
CREATE STREAM demo() WITH (TYPE="neuron",SHARED="TRUE")
反控 Neuron 時,需要在規則的動作裡添加 Neuron 動作,指定需要寫入的組名,節點名和 tag 名(均可為動态屬性)。eKuiper 将會自動轉換格式,适配 Neuron 的輸入格式。
"neuron": {
"nodeName": "{{.node}}",
"groupName": "grp",
"tags": [
"tag0"
]
}
詳細資訊,請參考文檔 Neuron 源。
SQL 拉取和寫入
SQL 拉取源提供了一種批量資料轉為流式資料的方式,使得 eKuiper 支援初步的流批一體處理的方式。
在舊的系統更新改造過程中,我們往往還需要考慮對原有的系統的相容。大量的老舊系統采用傳統關系資料庫存儲采集的資料。在新的系統中,可能也有儲存在資料庫中,不友善提供流式接入的資料卻需要進行實時計算的資料。還有更多的場景需要接入形形色色數量龐大的支援 SQL 的資料庫或其他外部系統。
eKuiper 提供了統一的\多資料庫通用的 SQL 拉取 source,可定時拉取支援 SQL 的資料源的資料,并提供基礎的去重能力,形成流式資料進行統一的流式計算處理。該插件的預編譯版本支援 MySQL、PostgresSQL 等常見資料庫的接入;同時插件中搭載了幾乎所有常見資料庫的連接配接能力,使用者隻需要在編譯時提供所需支援的資料庫的參數,即可自行編譯支援自定義資料庫類型的插件。
除了資料拉取,我們也提供了資料寫入的通用 SQL 插件。值得注意的是,eKuiper 本身已經提供了針對 InfluxDB、TDengine 等時序資料庫的專用插件。通用 SQL 插件同樣可以支援連接配接這些資料庫,但提供的是 insert 功能,不支援特定資料庫的非标準概念,例如 TDengine 的超級表隻能使用 TDengine 插件進行寫入。
更多資訊以及支援的資料庫清單,請參見 SQL source 插件。
eKuiper SQL 改進
内置函數是 SQL 完成各種計算的主要組織形式,也是 SQL 表達能力的重要來源。新版本中的 SQL 改進主要通過添加新的函數來實作。
變化監測函數
新的版本中添加了三個通用的變化檢測相關函數: CHANGED_COLS,CHANGED_COL 和 HAD_CHANGED 。
CHANGED_COLS 函數的作用是檢測指定的列是否發生變化,如果發生變化,則傳回變化的列的值,否則不傳回。在變化檢測的場景中,使用者經常需要監測多個列/表達式,而且數量不固定。是以,該函數可接收不定數量的參數,同時其傳回值為多個列。相比于普通的标量函數固定傳回單一結果列(多列結果會被包含在 map 中),這是第一個傳回多列的函數,我們對函數的實作進行了重構以實作多列函數的支援。該函數的參數個數是可變的,同時列的參數也可以是别的表達式。列參數也支援 * 号,表示檢測所有列,例如 SELECT CHANGED\*COLS("c\*", true,* ) FROM demo。
多列函數僅可在 Select 子句中使用,其選出的值不能用于 WHERE 或其他子句中。若需要根據變化值做過濾,可以使用 CHANGED_COL 函數擷取變化後的值做為過濾條件;或者使用 HAD_CHANGED 函數擷取多個列的變化狀态作為過濾條件。詳細資訊和使用示例,請參考文檔。
對選擇的列進行分組
規則的 SQL 語句中 select 選擇出的所有列會組成一個對象,供 sink 插件和下遊的應用進行處理。在有些場景中,下遊應用需要對選擇的列進行分組,然後靈活地對每個分組進行處理。例如,把選擇出來的結果分成多個 key/value 集合,其中 key 為檔案名,這樣可以動态地把結果寫入到多個檔案中。
新的内置方法 object_construct 可以輕松實作列的分組和命名。其文法為 object_construct(key1, col, ...),可支援多個參數,并傳回由參數建構的對象 。參數為一系列的鍵值對,是以必須為偶數個。鍵必須為 string 類型,值可以為任意類型。例如,使用者需要把列 1,2,3 寫入到檔案 1;而列 4,5 寫入到檔案 2 中。則可使用一條 SQL 規則對列進行分組,并對組名進行指派:
SELECT object_construct("key1", col1, "key2", col2, "key3", col3) AS file1, object_construct("key4", col4, "key5", col5) AS file2 FROM demoStream
其輸出結果形如下列 JSON 對象:
{
"file1":{"key1":"aValue","key2":23,"key3":2.5},
"file2":{"key4":"bValue","key5":90}
}
運維更容易
新的版本在運維方面的主要改進包括提升了運作時的穩定性,提供了友善的編譯參數友善使用者根據需求進行軟體功能的裁剪以适應更小算力的裝置。
規則隔離
新的版本中,我們對規則運作和生命周期進行了優化和重構,增加了規則運作的穩定性,提高規則之間的隔離性。主要表現在以下幾個方面:
規則錯誤隔離:即使是使用共享源的規則,某個規則的運作時錯誤也不會影響另外的相關規則。同時,新版本的規則系統級的 panic 錯誤也會在規則級别進行處理,不再導緻整個 eKuiper 程序崩潰。
規則負載隔離:使用共享源或者記憶體源的兄弟規則之間,在保持消息順序的同時,消息流入吞吐量不受其他規則的影響。
按需編譯
作為邊緣流式處理引擎,需要部署的異構目标系統很多,既有算力較好的邊緣端的機房、網關等,也有出于成本以及業務的特殊要求考慮而采用成本更便宜或是定制化的軟硬體方案。随着功能的逐漸增強,全功能的 eKuiper 在極端資源受限的裝置上,例如記憶體少于 50MB 的終端上,可能會稍顯臃腫。新的版本中,我們将 eKuiper 的核心功能和其他功能通過 go 語言的編譯标簽進行剝離。使用者在使用的時候,可以通過設定編譯參數的方式,按需編譯部分功能,進而得到更小的運作檔案。例如,僅編譯核心功能,可使用 make build_core 得到一個隻包含核心功能的運作檔案。進一步資訊,請參考 按需編譯。
文檔更易用
在 4 月上線的官網(https://ekuiper.org )中,eKuiper 文檔進行了目錄結構的重構,并編譯到文檔網站上。新的文檔網站增加了概念介紹、教程等子產品,調整了導航樹,希望能幫助使用者更友善地找到有用的資訊。
更新說明
eKuiper 的版本疊代會盡量保持新舊版本的相容,新的版本也不例外。更新到 1.5.0 版本,大部分功能無需改動即可平穩更新,但是有兩處改動需要使用者手動更改:
- Mqtt source 的伺服器配置項由 servers 改成 server,配置值由數組改為字元串。使用者的配置檔案 etc/mqtt_source.yaml 中需要進行更改。若使用環境變量,例如Docker 啟動和 docker compose 檔案啟動等,需要更改環境變量:MQTT_SOURCEDEFAULTSERVERS =》 MQTT_SOURCEDEFAULTSERVER 。Docker 啟動的指令更改為 docker run -p 9081:9081 -d --name ekuiper MQTT_SOURCEDEFAULTSERVER="$MQTT_BROKER_ADDRESS" lfedge/ekuiper:$tag。
- 若使用 Tdengine sink, 其屬性名 ip 改為 host , 屬性值必須為域名。