标簽
PostgreSQL , rule , trigger , 歸類 , json , udf , 異步消息 , listen , notify
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E8%83%8C%E6%99%AF 背景
因為二手商品沒有太多的活動、硬性分類,廣告等活動,是以購買或者銷售速度沒有新商品那麼快。為了提高二手商品的銷售效率,需要提供一套歸類政策。
當商品新增或商品内容發生變化時,需要根據商品屬性,以及定義的規則,實時進行商品歸類(魚塘,圈子等)友善使用者查詢。
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E7%BB%93%E6%9E%84%E8%AE%BE%E8%AE%A1 結構設計
1、商品ID,屬性
create table a (
id int8 primary key, -- 商品ID
att jsonb -- 商品屬性
);
屬性設計為JSON,JSON裡面是K-V的屬性對,V裡面是數組,包含K的值以及這對屬性的最後更新時間。
更新時間用于merge insert,當屬性發生變化時才更新,沒有發生變化時,不更新。
是以json需要周遊,并做合并處理。
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%90%88%E5%B9%B6json%E5%B1%9E%E6%80%A7%E7%9A%84udf 合并JSON屬性的UDF
create or replace function merge_json(jsonb, jsonb) returns jsonb as $$
select jsonb_object_agg(key,value) from (
select
coalesce(a.key, b.key) as key,
case
when
coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)
>
coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)
then a.value
else b.value
end
from jsonb_each($1) a full outer join jsonb_each($2) b using (key)
) t;
$$ language sql strict ;
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}', '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');
merge_json
-------------------------------------------------------------------------------------------------------------------------
{"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}
(1 row)
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E8%A7%A6%E5%8F%91%E5%99%A8%E8%AE%BE%E8%AE%A1 觸發器設計
觸發器裡面定義分類規則,例如這裡對價格大于100的商品,吐出消息.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then -- 規則1, 價格大于100,推送異步消息
perform pg_notify(
'a', -- 異步消息通道名字
format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att) -- 消息内容
);
-- elsif ... then 其他規則
-- else 其他規則
end if;
return null;
end;
$function$ language plpgsql strict;
建立after insert or update觸發器
create trigger tg1 after insert or update on a for each row execute procedure notify1();
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%85%B6%E4%BB%96%E8%A7%A6%E5%8F%91%E5%99%A8%E8%A7%84%E5%88%99%E8%AE%BE%E8%AE%A1%E6%96%B9%E6%B3%95 其他觸發器(規則設計方法)
本文未使用
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att) -- 解析一次JSONB
loop
-- 規則處理
-- if key='price' then ...; end if;
-- if key='count' then ...; end if;
end loop;
return null;
end;
$function$ language plpgsql strict;
-- 動态規則表
create table tbl_rule (
key text, -- key值
exp text, -- value 代入的表達式
class text, -- 滿足exp時,指向這個歸類
)
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att) -- 解析一次JSONB
loop
-- 使用tbl_rule生成規則處理邏輯,動态
end loop;
return null;
end;
$function$ language plpgsql strict;
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E8%A7%84%E5%88%99%E6%8F%8F%E8%BF%B0 規則描述
json屬性對中,value的類型可能很多,對應不同的規則語義。
1、文本 LIKE
2、數組 IN
3、等值
4、數值範圍
5、時間範圍
等等,在trigger的UDF中寫規則即可。
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E6%95%B0%E6%8D%AE%E5%90%88%E5%B9%B6%E5%86%99%E5%85%A5%E6%B5%8B%E8%AF%95 資料合并寫入測試
insert into a values
(1, '{"price":[10000, "2018-01-01 10:10:11"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新屬性,保留老屬性,需要使用一個UDF來合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷
postgres=# insert into a values
(1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新屬性,保留老屬性,需要使用一個UDF來合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷
INSERT 0 1
postgres=# select * from a;
id | att
----+-----------------------------------------------------------------------------
1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(1 row)
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E7%9B%91%E5%90%AC%E6%B6%88%E6%81%AF 監聽消息
postgres=# listen a;
LISTEN
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.
https://jdbc.postgresql.org/documentation/head/listennotify.html https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%85%B6%E4%BB%96 其他
删除商品,可以使用DELETE觸發器,告訴下遊,比如商品已成交,删除。
CREATE OR REPLACE FUNCTION notify2() returns trigger
AS $function$
declare
begin
perform pg_notify(
'a', -- 異步消息通道名字
format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att) -- 消息内容
);
return null;
end;
$function$ language plpgsql strict;
create trigger tg2 after delete on a for each row execute procedure notify2();
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E6%96%B9%E6%A1%88%E4%BA%8C---%E6%B5%81%E5%BC%8F%E6%89%B9%E9%87%8F%E6%B6%88%E8%B4%B9 方案二 - 流式批量消費
使用異步消息的方式,當連接配接中斷時,重新連接配接後需要重新監聽,并且在中斷連接配接期間的消息會被丢棄掉。是以可靠性不佳。
另外,異步消息無法控制一次消費多少條,也不是特别友好。
是以我們實際上還有其他方法,持久化表,并且使用異步批量消費的方式進行消費。
性能名額:
CASE | 資料量 | 并發 | TPS | 平均響應時間 |
---|---|---|---|---|
流式處理 - 閱後即焚 - 消費 | 10億,消費 395.2 萬行/s | 56 | 3952 | 14毫秒 |
結構沿用前面的例子,
1、新增一張結果表(也可以新增多張表,看業務量,通常一張夠用了),
2、同時修改一下觸發器内容,把notify改成寫表,
3、修改用戶端把監聽通道改成異步消費SQL
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#demo DEMO
1、新增結果表
create table t_result(id serial8 primary key, class text, content text);
2、觸發器裡面定義分類規則,例如這裡對價格大于100的商品,吐出資訊到結果表.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then -- 規則1, 價格大于100,寫入結果表
insert into t_result(class,content) values (
'a', -- 歸類
format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att) -- 消息内容
);
-- elsif ... then 其他規則
-- else 其他規則
end if;
return null;
end;
$function$ language plpgsql strict;
3、建立after insert or update觸發器
create trigger tg1 after insert or update on a for each row execute procedure notify1();
4、資料合并寫入測試
insert into a values
(1, '{"price":[10000, "2018-01-01 10:10:11"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新屬性,保留老屬性,需要使用一個UDF來合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷
postgres=# insert into a values
(1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新屬性,保留老屬性,需要使用一個UDF來合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷
INSERT 0 1
postgres=# select * from a;
id | att
----+-----------------------------------------------------------------------------
1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(1 row)
5、異步批量消費結果表的内容(閱後即焚)
with a as (delete from t_result where ctid= any(array(
select ctid from t_result order by id limit 10 for update skip locked -- 可以并發消費,不會互相堵塞,消費順與寫入順序一緻
)) returning *)
select * from a;
id | class | content
----+-------+---------------------------------------------------------------------------------------------------------
1 | a | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}
2 | a | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(2 rows)
原子操作,閱後即焚,再次查詢已消費完畢
postgres=# select * from t_result;
id | class | content
----+-------+---------
(0 rows)
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E6%96%B9%E6%A1%88%E4%BA%8C%E7%BB%AD---%E4%BD%BF%E7%94%A8statement%E7%BA%A7%E8%A7%A6%E5%8F%91%E5%99%A8%E4%BB%A3%E6%9B%BFrow%E7%BA%A7%E8%A7%A6%E5%8F%91%E5%99%A8 方案二續 - 使用statement級觸發器代替row級觸發器
為什麼建議使用statement級觸發器代替row級觸發器,參考:
《PostgreSQL 批量、單步 寫入 - row, statement 觸發器(中間表)、CTE 幾種用法性能對比》觸發器函數修改如下
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
-- 規則1
insert into t_result(class,content) select
'a', -- 歸類
format('CLASS:high price, ID:%s, ATT:%s', id, att) -- 消息内容
from new_table
where jsonb_array_element(att->'price', 0)::text::float8 > 100; -- 規則1, 價格大于100,寫入結果表
-- 其他規則
-- insert into t_result(class,content) select
-- ......
-- from new_table
-- where ... -- 規則n
return null;
end;
$function$ language plpgsql strict;
觸發器修改如下
create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();
postgres=# \d a
Table "public.a"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
id | integer | | not null |
att | jsonb | | |
Indexes:
"pk" PRIMARY KEY, btree (id)
Triggers:
tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()
tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()
https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%B0%8F%E7%BB%93 小結
使用異步消息,UDF,規則或觸發器,非常輕量化的解決了實時計算的問題。
但是,異步消息是可能丢消息的,例如監聽連接配接中斷後,重連時,需要重新發起監聽,并且中斷連接配接時未消費的消息,不會再被消費,是以相當于丢消息了。
改進方法:
1、如果要保證不丢消息,可以将notify改成INSERT,把結果寫入預先定義好的某個結果表,使用邏輯DECODE的方式,解析這個結果表相關的logical decode資訊,進而擷取變化量,參考如下。
《PostgreSQL pg_recvlogical 與 test_decoding 自定義,支援source table filter, 對接kafka,es等》2、使用閱後即焚的方法,類似本方案2.
《阿裡雲RDS PostgreSQL varbitx實踐 - 流式标簽 (閱後即焚流式批量計算) - 萬億級,任意标簽圈人,毫秒響應》 《HTAP資料庫 PostgreSQL 場景與性能測試之 32 - (OLTP) 高吞吐資料進出(堆存、行掃、無需索引) - 閱後即焚(JSON + 函數流式計算)》 《HTAP資料庫 PostgreSQL 場景與性能測試之 31 - (OLTP) 高吞吐資料進出(堆存、行掃、無需索引) - 閱後即焚(讀寫大吞吐并測)》 《HTAP資料庫 PostgreSQL 場景與性能測試之 27 - (OLTP) 物聯網 - FEED日志, 流式處理 與 閱後即焚 (CTE)》 《在PostgreSQL中實作update | delete limit - CTID掃描實踐 (高效閱後即焚)》