天天看點

PostgreSQL 流式處理應用實踐 - 二手商品實時歸類(異步消息notify/listen、閱後即焚)

标簽

PostgreSQL , rule , trigger , 歸類 , json , udf , 異步消息 , listen , notify

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E8%83%8C%E6%99%AF 背景

因為二手商品沒有太多的活動、硬性分類,廣告等活動,是以購買或者銷售速度沒有新商品那麼快。為了提高二手商品的銷售效率,需要提供一套歸類政策。

當商品新增或商品内容發生變化時,需要根據商品屬性,以及定義的規則,實時進行商品歸類(魚塘,圈子等)友善使用者查詢。

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E7%BB%93%E6%9E%84%E8%AE%BE%E8%AE%A1 結構設計

1、商品ID,屬性

create table a (      
  id int8 primary key,   -- 商品ID      
  att jsonb   -- 商品屬性      
);      
           

屬性設計為JSON,JSON裡面是K-V的屬性對,V裡面是數組,包含K的值以及這對屬性的最後更新時間。

更新時間用于merge insert,當屬性發生變化時才更新,沒有發生變化時,不更新。

是以json需要周遊,并做合并處理。

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%90%88%E5%B9%B6json%E5%B1%9E%E6%80%A7%E7%9A%84udf 合并JSON屬性的UDF

create or replace function merge_json(jsonb, jsonb) returns jsonb as $$    
  select jsonb_object_agg(key,value) from (    
  select     
    coalesce(a.key, b.key) as key,     
    case     
    when     
    coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    >     
    coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    then a.value    
    else b.value    
    end    
  from jsonb_each($1) a full outer join jsonb_each($2) b using (key)    
  ) t;      
$$ language sql strict ;    
    
    
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}',  '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');    
                                                       merge_json                                                            
-------------------------------------------------------------------------------------------------------------------------    
 {"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}    
(1 row)    
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E8%A7%A6%E5%8F%91%E5%99%A8%E8%AE%BE%E8%AE%A1 觸發器設計

觸發器裡面定義分類規則,例如這裡對價格大于100的商品,吐出消息.

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   -- 規則1, 價格大于100,推送異步消息      
     perform pg_notify(      
       'a',    -- 異步消息通道名字      
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   -- 消息内容      
     );      
  -- elsif ... then  其他規則      
  -- else  其他規則      
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      
           

建立after insert or update觸發器

create trigger tg1 after insert or update on a for each row execute procedure notify1();      
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%85%B6%E4%BB%96%E8%A7%A6%E5%8F%91%E5%99%A8%E8%A7%84%E5%88%99%E8%AE%BE%E8%AE%A1%E6%96%B9%E6%B3%95 其他觸發器(規則設計方法)

本文未使用

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  -- 解析一次JSONB    
  loop    
    -- 規則處理    
    -- if key='price' then ...; end if;    
    -- if key='count' then ...; end if;    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      
           
-- 動态規則表    
    
create table tbl_rule (    
  key text,  -- key值    
  exp text,  -- value 代入的表達式    
  class text,  -- 滿足exp時,指向這個歸類    
)    
    
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  -- 解析一次JSONB    
  loop    
    -- 使用tbl_rule生成規則處理邏輯,動态    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E8%A7%84%E5%88%99%E6%8F%8F%E8%BF%B0 規則描述

json屬性對中,value的類型可能很多,對應不同的規則語義。

1、文本 LIKE

2、數組 IN

3、等值

4、數值範圍

5、時間範圍

等等,在trigger的UDF中寫規則即可。

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E6%95%B0%E6%8D%AE%E5%90%88%E5%B9%B6%E5%86%99%E5%85%A5%E6%B5%8B%E8%AF%95 資料合并寫入測試

insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  -- 合并新屬性,保留老屬性,需要使用一個UDF來合并      
  where       
  a.att <> merge_json(a.att, excluded.att);  -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷   
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  -- 合并新屬性,保留老屬性,需要使用一個UDF來合并    
  where    
  a.att <> merge_json(a.att, excluded.att);   -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷  
INSERT 0 1    
    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E7%9B%91%E5%90%AC%E6%B6%88%E6%81%AF 監聽消息

postgres=# listen a;      
LISTEN      
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.      
           
https://jdbc.postgresql.org/documentation/head/listennotify.html

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%85%B6%E4%BB%96 其他

删除商品,可以使用DELETE觸發器,告訴下遊,比如商品已成交,删除。

CREATE OR REPLACE FUNCTION notify2() returns trigger      
AS $function$      
declare      
begin      
     perform pg_notify(      
       'a',                                                     -- 異步消息通道名字      
       format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att)   -- 消息内容      
     );      
return null;      
end;      
$function$ language plpgsql strict;      
    
create trigger tg2 after delete on a for each row execute procedure notify2();      
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E6%96%B9%E6%A1%88%E4%BA%8C---%E6%B5%81%E5%BC%8F%E6%89%B9%E9%87%8F%E6%B6%88%E8%B4%B9 方案二 - 流式批量消費

使用異步消息的方式,當連接配接中斷時,重新連接配接後需要重新監聽,并且在中斷連接配接期間的消息會被丢棄掉。是以可靠性不佳。

另外,異步消息無法控制一次消費多少條,也不是特别友好。

是以我們實際上還有其他方法,持久化表,并且使用異步批量消費的方式進行消費。

性能名額:

CASE 資料量 并發 TPS 平均響應時間
流式處理 - 閱後即焚 - 消費 10億,消費 395.2 萬行/s 56 3952 14毫秒

結構沿用前面的例子,

1、新增一張結果表(也可以新增多張表,看業務量,通常一張夠用了),

2、同時修改一下觸發器内容,把notify改成寫表,

3、修改用戶端把監聽通道改成異步消費SQL

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#demo DEMO

1、新增結果表

create table t_result(id serial8 primary key, class text, content text);    
           

2、觸發器裡面定義分類規則,例如這裡對價格大于100的商品,吐出資訊到結果表.

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   -- 規則1, 價格大于100,寫入結果表      
     insert into t_result(class,content) values (    
       'a',    -- 歸類    
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   -- 消息内容      
     );    
  -- elsif ... then  其他規則      
  -- else  其他規則      
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      
           

3、建立after insert or update觸發器

create trigger tg1 after insert or update on a for each row execute procedure notify1();      
           

4、資料合并寫入測試

insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  -- 合并新屬性,保留老屬性,需要使用一個UDF來合并      
  where       
  a.att <> merge_json(a.att, excluded.att);   -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷  
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  -- 合并新屬性,保留老屬性,需要使用一個UDF來合并    
  where    
  a.att <> merge_json(a.att, excluded.att); -- 如果相等的機率很低,則可以去掉這個判斷, 降低CPU開銷   
    
INSERT 0 1    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    
           

5、異步批量消費結果表的内容(閱後即焚)

with a as (delete from t_result where ctid= any(array(     
  select ctid from t_result order by id limit 10 for update skip locked  -- 可以并發消費,不會互相堵塞,消費順與寫入順序一緻    
)) returning *)    
select * from a;    
           
id | class |                                                 content                                                     
----+-------+---------------------------------------------------------------------------------------------------------    
  1 | a     | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}    
  2 | a     | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(2 rows)    
    
    
原子操作,閱後即焚,再次查詢已消費完畢    
    
postgres=# select * from t_result;    
 id | class | content     
----+-------+---------    
(0 rows)    
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E6%96%B9%E6%A1%88%E4%BA%8C%E7%BB%AD---%E4%BD%BF%E7%94%A8statement%E7%BA%A7%E8%A7%A6%E5%8F%91%E5%99%A8%E4%BB%A3%E6%9B%BFrow%E7%BA%A7%E8%A7%A6%E5%8F%91%E5%99%A8 方案二續 - 使用statement級觸發器代替row級觸發器

為什麼建議使用statement級觸發器代替row級觸發器,參考:

《PostgreSQL 批量、單步 寫入 - row, statement 觸發器(中間表)、CTE 幾種用法性能對比》

觸發器函數修改如下

CREATE OR REPLACE FUNCTION notify1() returns trigger        
AS $function$        
declare        
begin        
  -- 規則1  
  insert into t_result(class,content) select   
    'a',    -- 歸類    
    format('CLASS:high price, ID:%s, ATT:%s', id, att)   -- 消息内容     
  from new_table   
  where jsonb_array_element(att->'price', 0)::text::float8 > 100;    -- 規則1, 價格大于100,寫入結果表    
    
  -- 其他規則  
  -- insert into t_result(class,content) select   
  -- ......  
  --   from new_table   
  -- where ...  -- 規則n  
    
  return null;        
end;        
$function$ language plpgsql strict;      
           

觸發器修改如下

create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
           
postgres=# \d a  
                 Table "public.a"  
 Column |  Type   | Collation | Nullable | Default   
--------+---------+-----------+----------+---------  
 id     | integer |           | not null |   
 att    | jsonb   |           |          |   
Indexes:  
    "pk" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  
    tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  
           

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%B0%8F%E7%BB%93 小結

使用異步消息,UDF,規則或觸發器,非常輕量化的解決了實時計算的問題。

但是,異步消息是可能丢消息的,例如監聽連接配接中斷後,重連時,需要重新發起監聽,并且中斷連接配接時未消費的消息,不會再被消費,是以相當于丢消息了。

改進方法:

1、如果要保證不丢消息,可以将notify改成INSERT,把結果寫入預先定義好的某個結果表,使用邏輯DECODE的方式,解析這個結果表相關的logical decode資訊,進而擷取變化量,參考如下。

《PostgreSQL pg_recvlogical 與 test_decoding 自定義,支援source table filter, 對接kafka,es等》

2、使用閱後即焚的方法,類似本方案2.

《阿裡雲RDS PostgreSQL varbitx實踐 - 流式标簽 (閱後即焚流式批量計算) - 萬億級,任意标簽圈人,毫秒響應》 《HTAP資料庫 PostgreSQL 場景與性能測試之 32 - (OLTP) 高吞吐資料進出(堆存、行掃、無需索引) - 閱後即焚(JSON + 函數流式計算)》 《HTAP資料庫 PostgreSQL 場景與性能測試之 31 - (OLTP) 高吞吐資料進出(堆存、行掃、無需索引) - 閱後即焚(讀寫大吞吐并測)》 《HTAP資料庫 PostgreSQL 場景與性能測試之 27 - (OLTP) 物聯網 - FEED日志, 流式處理 與 閱後即焚 (CTE)》 《在PostgreSQL中實作update | delete limit - CTID掃描實踐 (高效閱後即焚)》

https://github.com/digoal/blog/blob/master/201807/20180713_03.md#%E5%8F%82%E8%80%83 參考

https://www.postgresql.org/docs/11/static/functions-json.html https://www.postgresql.org/docs/11/static/datatype-json.html https://www.postgresql.org/docs/11/static/sql-notify.html https://www.postgresql.org/docs/11/static/sql-listen.html https://www.postgresql.org/docs/11/static/sql-unlisten.html https://www.postgresql.org/docs/11/static/libpq-notify.html https://www.postgresql.org/docs/11/static/sql-notify.html#id-1.9.3.157.7.5 https://www.postgresql.org/docs/11/static/functions-info.html https://www.postgresql.org/docs/11/static/plpgsql-trigger.html https://github.com/impossibl/pgjdbc-ng https://www.openmakesoftware.com/postgresql-listen-notify-events-example/