天天看点

Greenplum, PostgreSQL 数据实时订阅的几种方式

PostgreSQL , Greenplum , 异步消息队列 , skytools , PGQ , 数据订阅 , 触发器 , 时间戳 , BINLOG , 逻辑复制 , 逻辑复制slot , 逻辑decode

通常在一个企业中,对一份数据可能更有多个业务系统需要对其进行处理。

<a href="https://github.com/digoal/blog/blob/master/201707/20170706_01.md">《从人类河流文明 洞察 数据流动的重要性》</a>

因此数据是流动的,通常会通过消息队列来完成这样的工作。不过呢,这样要求消息队列是最上游。

当无法将消息队列放到最上游时,例如数据先到了数据库,再订阅给其他业务线,怎么办呢?

就比如这里的RDS PG的部分。

<a href="https://github.com/digoal/blog/blob/master/201707/20170728_01_pic_001.jpg" target="_blank"></a>

下面来探讨一下有多少种方法来实现这个需求:将数据库的变更实时订阅到其他业务线(例如Kafka)。

PostgreSQL,在阿里云对应RDS for PostgreSQL这个产品。

PostgreSQL的数据变更,如何订阅给下游?下面是可选的方法:

1、触发器,将表的变更记录到一个流水表,然后业务通过读取流水表进行订阅。

<a href="https://github.com/digoal/blog/blob/master/201206/20120625_01.md">《USE hstore store table's trace record》</a>

2、规则,在需要订阅的表上,创建RULE,将表的变更记录到一个流水表,然后业务通过读取流水表进行订阅。

<a href="https://www.postgresql.org/docs/10/static/sql-createrule.html">https://www.postgresql.org/docs/10/static/sql-createrule.html</a>

3、异步消息,使用触发器或RULE,将表的变更写入CHANNEL。(数据库的异步消息通道功能)。

订阅端通过监听CHANNEL,实现对数据的订阅。

<a href="https://www.postgresql.org/docs/10/static/sql-notify.html">https://www.postgresql.org/docs/10/static/sql-notify.html</a>

<a href="https://www.postgresql.org/docs/10/static/sql-listen.html">https://www.postgresql.org/docs/10/static/sql-listen.html</a>

4、WAL 逻辑 decode。

从9.4的版本开始,PostgreSQL支持逻辑复制,将数据变更写入WAL,(类似MySQL的binlog复制)。客户端通过从WAL翻译REDO来实现订阅。

alidecode 是一个翻译wal的插件,用户也可以自己写翻译WAL的插件。

<a href="https://github.com/digoal/blog/blob/master/201605/20160526_01.md">《PostgreSQL 最佳实践 - 逻辑增量复制(MySQL &lt;-&gt; PgSQL &lt;-&gt; PgSQL)》</a>

5、PGQ ,是SKYTOOLS的一个基础功能,在PostgreSQL内部实现了一个异步的队列。用户可以对需要复制的表,创建PGQ,然后写PGQ的消费者来实现订阅。

<a href="https://wiki.postgresql.org/wiki/SkyTools">https://wiki.postgresql.org/wiki/SkyTools</a>

londiste3 就是一个用于复制的PGQ消费者代表程序。

6、confluentinc bottledwater-pg , 基于PG的WAL以及逻辑复制功能,实现的一个主动消费者,将数据自动从WAL翻译,并写入KAFKA队列,实现消息订阅。

<a href="https://github.com/digoal/blog/blob/master/201612/20161205_02_pic_003.png" target="_blank"></a>

<a href="https://github.com/confluentinc/bottledwater-pg">https://github.com/confluentinc/bottledwater-pg</a>

<a href="https://github.com/digoal/blog/blob/master/201612/20161205_02.md">《实时数据交换平台 - BottledWater-pg with confluent》</a>

7、时间戳,最传统的方法,用户在写入、删除、更新时,记录数据的写入时间、修改时间。

删除时,逻辑删除(标记字段),并记录删除时间。

通过这个时间戳的推移来订阅数据。

优先使用逻辑DECODE的方法(6、4)。

其次是时间戳(7)。

然后可以考虑PGQ。

然后再考虑异步消息的方法。

最后考虑触发器和规则。

Greenplum,在阿里云对应HybridDB for PostgreSQL这个产品。

也有若干种订阅方法:

推荐方法:

使用appendonly表,以及时间戳的方法。

阿里云推出了metascan技术,可以在不建索引的情况下,极度高效的实现时间戳的推移订阅。

1、datax,配置推移字段,推移订阅。

2、cdp

3、d2

4、dts,通过类似binlog的解析来订阅。