天天看點

TDengine 3.0 流式計算引擎文法規則介紹

小 T 導讀:TDengine3.0 引入了全新的流式計算引擎,既支援時間驅動的流式計算,也支援事件驅動的流式計算。本文将對新的流式計算引擎的文法規則進行詳細介紹,友善開發者及企業使用。

TDengine 是一款開源、雲原生的時序資料庫(Time Series Database,TSDB),專為物聯網、工業網際網路、金融、IT 運維監控等場景設計并優化。近期釋出的 TDengine 3.0,全新的流式計算引擎是其一大亮點。

TDengine 3.0 的流式計算引擎提供了實時處理寫入的資料流能力,使用 SQL 定義實時流變換,當資料被寫入流的源表後,資料會被以定義的方式自動處理,并根據定義的觸發模式向目的表推送結果。它提供了替代複雜流處理系統的輕量級解決方案,并能夠在高吞吐的資料寫入情況下,提供毫秒級的計算結果延遲。

流式計算可以包含資料過濾,标量函數計算(含 UDF),以及視窗聚合(支援滑動視窗、會話視窗與狀态視窗),可以以超級表、子表、普通表為源表,寫入到目的超級表。在建立流時,目的超級表将被自動建立,随後新插入的資料會被流定義的方式處理并寫入其中,通過 partition by 子句,可以以表名或标簽劃分 partition,不同的 partition 将寫入到目的超級表的不同子表。

TDengine 的流式計算能夠支援分布在多個 vnode 中的超級表聚合;還能夠處理亂序資料的寫入:它提供了 watermark 機制以度量容忍資料亂序的程度,并提供了 ignore expired 配置項以決定亂序資料的處理政策——丢棄或者重新計算。

下面我們就一起看一下 TDengine 中流式計算相關的 SQL 文法。

流式計算的建立、删除與展示

建立

CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
 TRIGGER    [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
 WATERMARK   time
}
           

其中 subquery 是 select 普通查詢文法的子集:

subquery: SELECT select_list
    from_clause
    [WHERE condition]
    [PARTITION BY tag_list]
    [window_clause]
           

支援會話視窗、狀态視窗與滑動視窗,其中,會話視窗與狀态視窗搭配超級表時必須與 partition by tbname 一起使用:

window_clause: {
    SESSION(ts_col, tol_val)
  | STATE_WINDOW(col)
  | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}
           

在上述語句中,SESSION 是會話視窗,tol_val 是時間間隔的最大範圍。在 tol_val 時間間隔範圍内的資料都屬于同一個視窗,如果有連續兩條資料的時間超過 tol_val,則自動開啟下一個視窗。視窗的定義與時序資料特色查詢中的定義完全相同,詳見 TDengine 特色查詢。

例如,使用如下語句建立流式計算,同時自動建立名為 avg_vol 的超級表,此流計算以一分鐘為時間視窗、30 秒為前向增量統計這些電表的平均電壓,并将來自 meters 表的資料的計算結果寫入 avg_vol 表,不同 partition 的資料會分别建立子表并寫入不同子表。

CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
           

删除

DROP STREAM [IF NOT EXISTS] stream_name;
           

僅删除流式計算任務,由流式計算寫入的資料不會被删除。

展示

SHOW STREAMS;
           

若要展示更詳細的資訊,可以使用:

SELECT * from performance_schema.`perf_streams`;
           

流式計算的 partition

我們可以使用 PARTITION BY TBNAME 或 PARTITION BY tag 對一個流進行多分區的計算,每個分區的時間線與時間視窗是獨立的,會各自聚合,并寫入到目的表中的不同子表。如果不帶 PARTITION BY 選項,那所有的資料将寫入到一張子表。

流式計算建立的超級表有唯一的 tag 列 groupId,每個 partition 會被配置設定唯一 groupId。與 schemaless 寫入一緻,我們通過 MD5 計算子表名,并自動建立它。

流式計算的觸發模式

在建立流時,可以通過 TRIGGER 指令指定流式計算的觸發模式。

對于非視窗計算,流式計算的觸發是實時的;對于視窗計算,目前提供如下 3 種觸發模式:

  • AT_ONCE:寫入立即觸發
  • WINDOW_CLOSE:視窗關閉時觸發(視窗關閉由事件時間決定,可配合 watermark 使用)
  • MAX_DELAY time:若視窗關閉,則觸發計算。若視窗未關閉,且未關閉時長超過 max delay 指定的時間,則觸發計算。

由于視窗關閉是由事件時間所決定的,如果因事件流中斷、或持續延遲導緻事件時間無法更新,可能無法得到最新的計算結果。是以,流式計算提供了以事件時間結合處理時間計算的 MAX_DELAY 觸發模式,MAX_DELAY 模式在視窗關閉時會立即觸發計算。此外,當資料寫入後,計算觸發的時間超過 max delay 指定的時間,則立即觸發計算。

流式計算的視窗關閉

流式計算以事件時間(插入記錄中的時間戳主鍵)為基準計算視窗關閉,而非以 TDengine 伺服器的時間,這樣可以避免用戶端與伺服器時間不一緻帶來的問題,有效解決亂序資料寫入等難題。同時,流式計算還提供了 watermark 來定義容忍的亂序程度。

在建立流時,我們可以在 stream_option 中指定 watermark,它定義了資料亂序的容忍上界。流式計算通過 watermark 來度量對亂序資料的容忍程度,watermark 預設為 0。

T = 最新事件時間 – watermark

每次寫入的資料都會以上述公式更新視窗關閉時間,并将視窗結束時間 < T 的所有打開的視窗關閉,若觸發模式為 WINDOW_CLOSE 或 MAX_DELAY,則推送視窗聚合結果。

::: hljs-center

TDengine 3.0 流式計算引擎文法規則介紹

:::

在上圖中,縱軸表示不同時刻,對于不同時刻,我們畫出其對應的 TDengine 收到的資料,即為橫軸。已知橫軸上的資料點表示已經收到的資料,其中藍色的點表示事件時間(即資料中的時間戳主鍵)最後的資料,該資料點減去定義的 watermark 時間,就得到亂序容忍的上界 T。所有結束時間小于 T 的視窗都将被關閉(圖中以灰色方框标記)。

在 T2 時刻,亂序資料(黃色的點)到達 TDengine,由于有 watermark 的存在,這些資料進入的視窗并未被關閉,是以可以被正确處理。在 T3 時刻,最新事件到達,T 向後推移超過了第二個視窗關閉的時間,該視窗被關閉,亂序資料被正确處理。

但要注意,在 window_close 或 max_delay 模式下,視窗關閉直接影響推送結果。在 at_once 模式下,視窗關閉隻與記憶體占用有關。

流式計算的過期資料處理政策

對于已關閉的視窗,再次落入該視窗中的資料就會被标記為過期資料。TDengine 對于過期資料提供兩種處理方式,由 IGNORE EXPIRED 選項指定:

  • 重新計算,即 IGNORE EXPIRED 0:預設配置,從 TSDB 中重新查找對應視窗的所有資料并重新計算得到最新結果
  • 直接丢棄,即 IGNORE EXPIRED 1:忽略過期資料

無論在哪種模式下,watermark 都應該被妥善設定,來得到正确結果(直接丢棄模式)或避免頻繁觸發重算帶來的性能開銷(重新計算模式)。

示例

企業電表的資料經常都是成百上千億條的,想要将這些分散、淩亂的資料清洗或轉換都需要比較長的時間,很難做到高效性和實時性。在如下例子中,通過 TDengine 流計算可以将電表電壓大于 220V 的資料清洗掉,然後以 5 秒為視窗整合并計算出每個視窗中電流的最大值,最後将結果輸出到指定的資料表中。

建立 Database 和原始資料表

首先準備資料,完成建庫、建一張超級表和多張子表操作:

DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
           

建立流

create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);
           

寫入資料

insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000);
insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000);
insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000);
insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000);
insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000);
insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);
           

查詢以觀察結果

taos> select start, end, max_current from current_stream_output_stb;
          start          |           end           |     max_current      |
===========================================================================
 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 |             10.30000 |
 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 |             12.60000 |
Query OK, 2 rows in database (0.018762s)
           

寫在最後

如果大家能夠運用好 TDengine 3.0 提供的流計算引擎,就不需要再部署其他的第三方流處理系統,這樣一來,不僅降低了系統的複雜度,還大大減少了研發和運維成本。在實際操作中應用 TDengine 流計算引擎時,上述的詳細文法會帶給你很多幫助,如果還産生了其他更為複雜的應用問題,你也可以進入 TDengine 社群向技術人員尋求幫助。