天天看點

【重新發現PostgreSQL之美】- 7 垂簾聽政 異步消息

背景

場景:

  • 重要資料在寫入、更新、删除時實時告警或轉存
  • 流式資料(公務車電子圍欄、刑偵資料探針、股票資料規則探針、伺服器運作情況) 實時預警或事件觸發
  • 危險操作(DDL) 異步監控

規則+ 異步消息的優勢:

1、通過規則過濾掉不需要寫入的正常資料, 由于業務正常資料通常占比在99%以上, 進而大幅減輕寫入量.

2、傳統的利用定時器查詢所有資料去發現問題, 還需要在時間、VAL、SID等層面去建立索引, 消耗大量存儲, 同時索引增加寫入RT,性能下降. 規則+異步完全規避這個問題.

3、可以實時發現并預警或觸發其他動作

文法

postgres=# \h create rule  

Command:     CREATE RULE  

Description: define a new rewrite rule  

Syntax:  

CREATE [ OR REPLACE ] RULE name AS ON event  

    TO table_name [ WHERE condition ]  

    DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }  

where event can be one of:  

    SELECT | INSERT | UPDATE | DELETE  

URL:

https://www.postgresql.org/docs/14/sql-createrule.html

postgres=# \h listen  

Command:     LISTEN 

Description: listen for a notification  

LISTEN channel  

https://www.postgresql.org/docs/14/sql-listen.html

postgres=# \h notify  

Command:     NOTIFY 

Description: generate a notification  

NOTIFY channel [ , payload ]  

https://www.postgresql.org/docs/14/sql-notify.html

postgres=# \df *.*channel*  

                                 List of functions  

   Schema  |         Name          | Result data type | Argument data types | Type   

------------+-----------------------+------------------+---------------------+------  

 pg_catalog | pg_listening_channels | SETOF text       |                     | func  

(1 row)  

postgres=# \df *.*notify*  

                           List of functions  

   Schema  |   Name    | Result data type | Argument data types | Type   

------------+-----------+------------------+---------------------+------  

 pg_catalog | pg_notify | void             | text, text          | func  

例子

機房傳感器

create table tbl_sensor_log (  

id serial8 primary key,  

sid int,  

val jsonb,  

crt_time timestamp  

);  

定義規則, 發現異常資料向alert通道發送消息

create or replace rule r1 as on insert  

to tbl_sensor_log  

where coalesce(val['temp']::float4,0) >= 60  

or coalesce(val['cpu_perct']::float4,0) >= 80 

or coalesce(val['mem_perct']::float4,0) >= 80 

or coalesce(val['io_perct']::float4,0) >= 80 

do also  

select pg_notify('alert', format('sensor: %s, ts:%s, val:%s', NEW.sid, NEW.crt_time, NEW.val));   

定義規則(可選), 正常資料不寫入

create or replace rule r2 as on insert  

where not (coalesce(val['temp']::float4,0) >= 60  

or coalesce(val['io_perct']::float4,0) >= 80) 

do instead NOTHING;   

postgres=# \d+ tbl_sensor_log;  

                                                                 Table "public.tbl_sensor_log" 

  Column |            Type             | Collation | Nullable |                  Default                   | Storage  | Compression | Stats target | Description   

----------+-----------------------------+-----------+----------+--------------------------------------------+----------+-------------+--------------+-------------  

 id      | bigint                     |           | not null | nextval('tbl_sensor_log_id_seq'::regclass) | plain    |             |              |   

 sid     | integer                    |           |          |                                            | plain   |             |              |   

 val     | jsonb                      |           |          |                                            | extended | pglz        |              |   

 crt_time | timestamp without time zone |           |         |                                            | plain    |             |              |   

Indexes:  

    "tbl_sensor_log_pkey" PRIMARY KEY, btree (id)  

Rules:  

    r1 AS 

    ON INSERT TO tbl_sensor_log  

   WHERE COALESCE(new.val['temp'::text]::real, 0::real) >= 60::double precision OR COALESCE(new.val['cpu_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['mem_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['io_perct'::text]::real, 0::real) >= 80::double precision DO  SELECT pg_notify('alert'::text, format('sensor: %s, val:%s'::text, new.sid, new.val)) AS pg_notify  

Access method: heap  

壓測

CREATE TYPE sensor_js AS (temp float4, cpu_perct float4, mem_perct float4, io_perct float4);   

insert into tbl_sensor_log (sid,val,crt_time)  

values (  

  1,  

 row_to_json(row(1,80.1,2,99.11)::sensor_js)::jsonb,  

  now()  

vi test.sql  

\set sid random(1,1000000)  

\set v1 random(1,61)  

\set v2 random(1,81)  

\set v3 random(1,81)  

\set v4 random(1,81)  

values (:sid, row_to_json(row(:v1,:v2,:v3,:v4)::sensor_js)::jsonb,now());  

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 5 -j 5 -T 120  

開啟其他會話, 監聽alert這個通道的異步消息.

PG 的異步消息為廣播模式. 可以在多個會話監聽同一個通道, 如果有多個業務希望接收同一類異步消息, 則可以這麼做.

listen alter;  

Asynchronous notification "alert" with payload "sensor: 459294, val:{"temp": 32, "io_perct": 81, "cpu_perct": 76, "mem_perct": 39}" received from server process with PID 1715.  

Asynchronous notification "alert" with payload "sensor: 788337, val:{"temp": 60, "io_perct": 34, "cpu_perct": 12, "mem_perct": 53}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 421071, val:{"temp": 7, "io_perct": 81, "cpu_perct": 12, "mem_perct": 14}" received from server process with PID 1716.  

Asynchronous notification "alert" with payload "sensor: 523366, val:{"temp": 13, "io_perct": 45, "cpu_perct": 70, "mem_perct": 80}" received from server process with PID 1713.  

Asynchronous notification "alert" with payload "sensor: 94909, val:{"temp": 57, "io_perct": 1, "cpu_perct": 32, "mem_perct": 81}" received from server process with PID 1713.  

Asynchronous notification "alert" with payload "sensor: 13910, val:{"temp": 61, "io_perct": 39, "cpu_perct": 39, "mem_perct": 2}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 252342, val:{"temp": 7, "io_perct": 31, "cpu_perct": 80, "mem_perct": 13}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 222983, val:{"temp": 56, "io_perct": 76, "cpu_perct": 80, "mem_perct": 25}" received from server process with PID 1715.  

Asynchronous notification "alert" with payload "sensor: 913661, val:{"temp": 60, "io_perct": 23, "cpu_perct": 80, "mem_perct": 9}" received from server process with PID 1716.  

壓測資料分析:

1、在不開啟rule時, 寫入速度比開啟rule快, 因為rule裡面有CPU運算. 增加了RT.

但是這是純計算, 沒有IO, 記憶體等開銷. 總體效率絕對比定時器後查詢快很多.

progress: 1.0 s, 63373.9 tps, lat 0.078 ms stddev 0.066 

progress: 2.0 s, 67591.2 tps, lat 0.074 ms stddev 0.044 

progress: 3.0 s, 66330.3 tps, lat 0.075 ms stddev 0.039 

progress: 4.0 s, 65786.8 tps, lat 0.076 ms stddev 0.038 

progress: 5.0 s, 65436.3 tps, lat 0.076 ms stddev 0.043 

progress: 6.0 s, 64276.1 tps, lat 0.077 ms stddev 0.042 

progress: 7.0 s, 59162.6 tps, lat 0.084 ms stddev 0.045 

progress: 8.0 s, 53887.5 tps, lat 0.092 ms stddev 0.048 

progress: 1.0 s, 43413.8 tps, lat 0.114 ms stddev 0.084 

progress: 2.0 s, 42803.5 tps, lat 0.116 ms stddev 0.040 

progress: 3.0 s, 40092.0 tps, lat 0.124 ms stddev 0.176 

progress: 4.0 s, 41419.0 tps, lat 0.120 ms stddev 0.046 

progress: 5.0 s, 41637.6 tps, lat 0.120 ms stddev 0.040 

progress: 6.0 s, 41918.2 tps, lat 0.119 ms stddev 0.040 

progress: 7.0 s, 41753.3 tps, lat 0.119 ms stddev 0.038 

progress: 8.0 s, 35983.6 tps, lat 0.139 ms stddev 0.042 

在mac book pro上資料輕松破百萬

postgres=# select count(*) from tbl_sensor_log;  

  count   

---------  

 2624221 

其他異步消息應用

202103/20210311_03.md 

《Postgres Notify for Real Time Dashboards》

201807/20180716_01.md 

《PostgreSQL 異步消息(LISTEN/NOTIFY)緩存多大?》

201807/20180713_03.md 

《PostgreSQL 流式處理應用實踐- 二手商品實時歸類(異步消息notify/listen、閱後即焚)》

201711/20171111_01.md 

《PostgreSQL 異步消息實踐- Feed系統實時監測與響應(如電商主動服務) - 分鐘級到毫秒級的實作》

201710/20171018_03.md 

《[未完待續] PGQ 異步消息隊列的使用》

201709/20170925_02.md 

《PostgreSQL 事件觸發器應用- DDL審計記錄+ 異步通知(notify)》

201701/20170116_01.md 

《從電波表到資料庫小程式之- 資料庫異步廣播(notify/listen)》

201111/20111122_01.md 

《PostgreSQL Notify/Listen Like ESB》

201701/20170113_03.md 

《從微信小程式到資料庫"小程式" , 鬼知道我經曆了什麼》