天天看点

初学 PipelineDBPipelineDB

PipelineDB

PipelineDB 是利用SQL查询语句对流式数据进行处理。输出的结果存储在表格中。其优点在于仅仅在硬盘上存储连续查询(continuous queries)的输出结果,进而可降低流式数据集的基数。原始数据(raw data)一旦被连续查询读取消费后,就会被丢弃。因此,PipelineDB不是用来存储数据的数据仓库。

通过 PipelineDB 的数据被看作是虚拟数据。PipelineDB的目的在于降低数据流对 ETL 过程的依赖性。原始数据可以直接流入PipelineDB,同时,可以通过连续查询实时定义、提取已声明的信息。

Continuous Views

Continuous view 是PipelineDB的基本构成,可以看做是一个普通视图(regular view)。此外,它可以选取流和表中的组合值作为输入,并且实时增加的数据都可以被写入当作输入值。

流一旦被 CV 读取以后,它就会被丢弃,不存储在任何地方。当有SELECT * FROM that_view,数据才可以存储。可以将一个CV看作是一个高通量、实时的实体化视图。

  1. 创建 CV

    CREATE CONTINUOUS VIEW name AS query

    其中,query 是 PostgreSQL的SELECT描述。

  2. 丢弃 CV

    DROP CONTINUOUS VIEW name

  3. 截断 CV

    SELECT truncate_continuous_view(‘name’);

    这句话用来溢出 cv 的数据,而不是cv本身,即清空表格。类似于 PostgreSQL的truncate: https://www.postgresql.org/docs/9.5/static/sql-truncate.html

  4. 查看 CV

    SELECT * FROM pipeline_views();

  5. 数据检索

    数据减速最简单的就是运用 SELECT语句,如下

    SELECT * FROM some_continuous_view

    也可以进一步进行分析再输出。

  6. 生存期限(Time-to-Live Expiration)

    包含两个参数: ttl和ttl_column。

    当数组的ttl_column值(时间戳)超过ttl时间段的值时,Expiration通过控制一个或多个"reaper"进程来删除“老”的数据 。例如:

    CREATE CONTINUOUS VIEW v_ttl WITHm (ttl = ‘1 month’, ttl_column = ‘minute’) AS<>

    SELECT minute(arrival_timestamp), COUNT(*) FROM some_stream GROUP BY minute;

  1. 设置 TTLs

    设置 TTL 可以通过set_ttl函数来设置:

    set_ttl(cv_name, ttl, ttl_conlumn)

  2. 激活和停止

    CV连续不断地处理输入流,因而有一个开始、停止机制,而不完全关闭 PipelineDB 非常有用。

    SELECT activate(‘continuous_view_or_transform’);

    SELECT deactivate(‘continuous_view_or_transform’);

    当 cv 没有激活时,事件可以写入流,但不能读取。

Continuous Transforms

CT用来对输入的数据进行连续转换,但不保存,因而也不支持聚合计算。转换的结果可以转入另外的流或者写入其他的。

  1. 创建 CT

    CREATE CONTINUOUS TRANSFORMS name AS query [THEN EXECUTE PROCEDURE function_name ( arguments )]

  2. 丢弃 CT

    DROP CONTINUOUS TRANSFORM name

  3. 查看 CT

    SELECT * FROM pipeline_transforms( );

  4. CT的输出流

    为了方便其他转换或者 CV 读取,CT 都有输出流与其对接。例如,将转换后的写入table:

    CREATE CONTINUOUS TRANSFORM t AS

    SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;

  5. 内置转换触发器

    PipelineDBT 提供了一个内置接口,用触发函数接收转换的数据。触发函数亦可将转换后的数据写入其他流。用pipeline_stream_insert函数来操作。例如:

    CREATE CONTINUOUS TTRANSFORM t AS

    SELECT x::int, y::int FROM stream WHERE mod(x,2) = 0

    THEN EXECUTE PROCEDURE pipeline_stream_insert(‘even_stream’)

    上述例子是将所有(x, y)数组中当 x 为偶数时,将其插入even_stream。

Streams

流中的每列数据,可以简单的看成一个事件。事件存在流中,被 CV 消费、读取。流可唯一的被作为 CV 的输入。建立流:

CREATE STREAM stream_name([

{ column_name data_type[ COLLATE collation] | LIKE parent_stream}[, …]])

  1. 写入流

    用INSERT。

    例如:INSERT INTO stream (x,y,z) VALUES (0,1,2);

INSERT配合PREPARE可以降低网络负载。

可用COPY将数据从文件写入流,例如,

COPY stream (data) FROM ‘/some/file.csv’

  1. 输出流

    对于 CV 的每一行输出的数据,通常都包含两个属性: old 和new。如果change与 CV insert 相关,则old元组为Null; 如果 change 与delete相关,则new元组为 Null。

    例如:CREATE CONTINUOUS VIEW v_deltas AS SELECT abs((new).sum - (old).sum) AS delta

    FROM output_of(‘v_sum’)

    WHERE abs((new).sum - (old).sum) > 10;

    old 和new两个元组必须用圆括号包起。 :)

  2. Delta Streams

    delta 元组用来保存old 和new之间的区别。

滑动窗口

连续视图会随着时间的推移,而不断更新,所以 P 要在更新连续视图的时候要考虑当前时间。包含where与当前时间相关的时间分量的子句的查询称为滑动窗口查询。

一个滑动的where 子句由两个重要的部分组成:clock_timestamp( ),arrival_timestamp( )。但是在 PipelindDB中不用显示的添加这些值,可以通过参数sw,在连续视图中做出定义。例如:

CREATE CONTINUOUS VIEW recent_users WITH (sw = ‘1 minute’) AS

SELECT user_id :: interger FROM stream;

在PipelineDB,会将其重写为以下:

CREATE CONTINUOUS VIEW recent_users AS

SELECT user_id :: integer FROM stream

WHERE (arrival_timestamp > clock_timestamp() - interval ‘1minute’);

数据类型、函数

  1. Bloom Filter Aggreates