天天看點

PostgreSQL 實踐 - 實時廣告位推薦 1 (任意字段組合、任意次元組合搜尋、輸出TOP-K)

标簽

PostgreSQL , gin , 倒排 , rum , gin_fuzzy_search_limit , 随機采樣 , 分區索引 , 分段索引 , score分段

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%83%8C%E6%99%AF 背景

店鋪,廣告推薦位,自動計算,高效檢索,高效更新。

根據:本店、全網使用者行為,庫存等進行運算,得到每個商品的分值,推薦排行靠前的商品。

次元可能很多,例如:北京的男性使用者在秋天買襪子的可能性是0.431,這裡面就是4個次元。實際場景次元可能有幾十個,幾百個,甚至幾千個。

需要支援任意次元,排序,求TOP 100,要求毫秒級延遲,100萬QPS。

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A11 設計1

1、定義次元

create table tbl_weidu (  
  wid int primary key,  
  info json  
);   
           

2、定義推薦表,隻存儲排在前100的商品和分值

create table tbl_score (  
  wid int not null,   -- 次元ID  
  uid int8 not null,  -- ToB 店鋪ID  
  top10 text[] not null, -- top 10的 item_score  
  primary key(wid,uid)  
);   
           

3、定義一個函數,用于合并兩個text數組,在有新的商品分值輸入時,合并為一個新值(當商品重複時,新值覆寫舊值,最後排序,保留輸出TOP N)

create or replace function merge_top10(  
text[],   -- old value  
text[],   -- new value  
ln int    -- 按score排序,保留 top N  
) returns text[] as $$  
  select array_agg(v2||'_'||v3 order by v3 desc) from   
  (  
    select v2,v3 from   
    (  
      select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from   -- 同一個商品, 使用new values  
      (  
        select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info) -- old values  
        union all  
        select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info) -- new values  
      ) t  
    ) t where rn=1 order by v3 desc limit ln   -- 同一個商品, 使用new values  
  ) t;  
$$ language sql strict immutable;  
           

4、定義日志表,用于記錄商品在某個次元上的值的變更,後面消費這個LOG表,合并更新最後的tbl_score表

create unlogged table tbl_score_log (  
  wid int not null,   -- 次元ID  
  uid int8 not null,  -- ToB 店鋪ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create index idx_tbl_score_log_1 on tbl_score_log (wid,uid,crt_time);  
           

5、定義消費函數

create or replace function consume_log(  
  i_loop int,    -- 循環處理多少次,(多少組wid,uid)  
  i_limit int,   -- 對于同一組wid,uid,單次處理多少行  
  i_topn int     -- 每個wid,uid 次元,保留TOP N個item (score高的前N個)  
) returns void as $$  
declare  
  v_wid int;  
  v_uid int8;  
  v_top1 text[];  
  i int := 0;  
begin  
  LOOP  
  exit when i >= i_loop;   --  loops  
  
  select wid,uid into v_wid,v_uid from tbl_score_log for update skip locked limit 1;  
  
  with  
  a as (  
    delete from tbl_score_log where ctid= any (array(  
      select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit  -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc) into v_top1   
    from  
    (select item,score from a order by score desc limit i_topn) t;        -- limit topn  
  
  insert into tbl_score   
  values (v_wid, v_uid, v_top1)  
  on conflict (wid,uid)  
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where  
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);  
  
  i := i+1;  
  END LOOP;  
end;  
$$ language plpgsql strict;  
           

6、壓測1,生成分值變更日志

(1000個次元,1萬家店,1億個商品)

vi test.sql  
\set wid random(1,1000)  
\set uid random(1,10000)  
\set item random(1,100000000)  
insert into tbl_score_log values (:wid,:uid,:item,random()*100,now());  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
tps = 257737.493753 (including connections establishing)  
tps = 257752.428348 (excluding connections establishing)  
           

寫入超過25萬行/s.

8、消費LOG表,合并結果到分值表

postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
postgres=# \timing  
Timing is on.  
  
postgres=# select * from tbl_score limit 10;  
 wid | uid  |                top10                  
-----+------+-------------------------------------  
 115 |   69 | {989915_22.2217}  
 441 | 3914 | {7521898_39.2669}  
 423 | 7048 | {75494665_92.5439}  
 789 | 1335 | {57756208_23.4602}  
 776 | 8065 | {41134454_46.8727}  
 785 | 6248 | {76364646_93.4671,94065193_69.2552}  
 567 | 7539 | {97116865_6.93694}  
 207 | 6926 | {45163995_14.1626}  
 788 | 9025 | {73053901_80.3204}  
 334 | 2805 | {80532634_78.1224}  
(10 rows)  
  
Time: 0.300 ms  
           

9、跟蹤每一次消費消耗的資源

load 'auto_explain';  
set auto_explain.log_analyze =on;  
set auto_explain.log_buffers =on;  
set auto_explain.log_min_duration =0;  
set auto_explain.log_nested_statements =on;  
set auto_explain.log_time=on;  
set auto_explain.log_verbose =on;  
set client_min_messages ='log';  
           
postgres=# select consume_log(1, 10000, 100);  
LOG:  duration: 0.819 ms  plan:  
Query Text: select wid,uid                  from tbl_score_log for update skip locked limit 1  
Limit  (cost=10000000000.00..10000000000.03 rows=1 width=18) (actual time=0.816..0.816 rows=1 loops=1)  
  Output: wid, uid, ctid  
  Buffers: shared hit=177  
  ->  LockRows  (cost=10000000000.00..10000876856.44 rows=30947272 width=18) (actual time=0.815..0.815 rows=1 loops=1)  
        Output: wid, uid, ctid  
        Buffers: shared hit=177  
        ->  Seq Scan on public.tbl_score_log  (cost=10000000000.00..10000567383.72 rows=30947272 width=18) (actual time=0.808..0.808 rows=1 loops=1)  
              Output: wid, uid, ctid  
              Buffers: shared hit=176  
LOG:  duration: 0.104 ms  plan:  
Query Text: with  
  a as (  
    delete from tbl_score_log where ctid= any (array(  
      select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit  -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)                  from  
    (select item,score from a order by score desc limit i_topn) t  
Aggregate  (cost=13.56..13.57 rows=1 width=32) (actual time=0.100..0.100 rows=1 loops=1)  
  Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)  
  Buffers: shared hit=20  
  CTE a  
    ->  Delete on public.tbl_score_log tbl_score_log_1  (cost=2.06..13.16 rows=10 width=6) (actual time=0.059..0.063 rows=4 loops=1)  
          Output: tbl_score_log_1.item, tbl_score_log_1.score  
          Buffers: shared hit=20  
          InitPlan 1 (returns $0)  
            ->  Limit  (cost=0.56..2.05 rows=1 width=14) (actual time=0.017..0.043 rows=4 loops=1)  
                  Output: tbl_score_log.ctid, tbl_score_log.crt_time  
                  Buffers: shared hit=8  
                  ->  Index Scan using idx_tbl_score_log_1 on public.tbl_score_log  (cost=0.56..5.02 rows=3 width=14) (actual time=0.017..0.041 rows=4 loops=1)  
                        Output: tbl_score_log.ctid, tbl_score_log.crt_time  
                        Index Cond: ((tbl_score_log.wid = $5) AND (tbl_score_log.uid = $6))  
                        Buffers: shared hit=8  
          ->  Tid Scan on public.tbl_score_log tbl_score_log_1  (cost=0.01..11.11 rows=10 width=6) (actual time=0.053..0.055 rows=4 loops=1)  
                Output: tbl_score_log_1.ctid  
                TID Cond: (tbl_score_log_1.ctid = ANY ($0))  
                Buffers: shared hit=12  
  ->  Limit  (cost=0.37..0.37 rows=1 width=12) (actual time=0.077..0.079 rows=4 loops=1)  
        Output: a.item, a.score  
        Buffers: shared hit=20  
        ->  Sort  (cost=0.37..0.39 rows=10 width=12) (actual time=0.076..0.077 rows=4 loops=1)  
              Output: a.item, a.score  
              Sort Key: a.score DESC  
              Sort Method: quicksort  Memory: 25kB  
              Buffers: shared hit=20  
              ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=12) (actual time=0.060..0.066 rows=4 loops=1)  
                    Output: a.item, a.score  
                    Buffers: shared hit=20  
LOG:  duration: 0.046 ms  plan:  
Query Text: insert into tbl_score   
  values (v_wid, v_uid, v_top1)  
  on conflict (wid,uid)  
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where  
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  
Insert on public.tbl_score  (cost=0.00..0.01 rows=1 width=44) (actual time=0.045..0.045 rows=0 loops=1)  
  Conflict Resolution: UPDATE  
  Conflict Arbiter Indexes: tbl_score_pkey  
  Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))  
  Tuples Inserted: 1  
  Conflicting Tuples: 0  
  Buffers: shared hit=7  
  ->  Result  (cost=0.00..0.01 rows=1 width=44) (actual time=0.000..0.001 rows=1 loops=1)  
        Output: $5, $6, $7  
LOG:  duration: 1.951 ms  plan:  
Query Text: select consume_log(1, 10000, 100);  
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=1.944..1.944 rows=1 loops=1)  
  Output: consume_log(1, 10000, 100)  
  Buffers: shared hit=212  
 consume_log   
-------------  
   
(1 row)  
  
Time: 2.390 ms  
           

消耗1萬個名額,約1.5秒。

10、壓測2,查詢某個次元,某個店鋪的廣告位推薦

vi test1.sql  
  
\set wid random(1,1000)  
\set uid random(1,10000)  
select * from tbl_score where wid=:wid and uid=:uid;  
  
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 32 -j 32 -T 120  
  
tps = 470514.018510 (including connections establishing)  
tps = 470542.672975 (excluding connections establishing)  
           

查詢速度可以達到 45萬 qps.

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A12 設計2

設計1的一個可以優化的點,在寫入tbl_score_log時,如果不同次元的資料夾雜在一起輸入,在消費時會引入IO放大的問題:

《PostgreSQL 時序最佳實踐 - 證券交易系統資料庫設計 - 阿裡雲RDS PostgreSQL最佳實踐》

我們可以使用以上同樣的方法來對次元資料分區存放,消費時也按分區消費。

1、建立次元描述表

create table tbl_weidu (  
  wid int primary key,  
  info json   
);   
           

2、建立TOP-K分值表

create table tbl_score (  
  wid int not null,   -- 次元ID  
  uid int8 not null,  -- ToB 店鋪ID  
  top10 text[] not null, -- top 10的item_score  
  primary key(wid,uid)  
);   
           

3、建立任務表,記錄每次消耗LOG時的計數,每個次元一個計數器

create table tbl_score_task (  
  wid int not null,   -- 次元ID  
  uid int8 not null,  -- ToB 店鋪ID  
  cnt int8 default 0, -- 被計算次數  
  primary key(wid,uid)  
);   
  
create index idx_tbl_score_task_cnt on tbl_score_task (cnt);  
           

4、合并兩個TEXT數組的函數

create or replace function merge_top10(  
text[],   -- old value  
text[],   -- new value  
ln int    -- 按score排序,保留 top N  
) returns text[] as $$  
  select array_agg(v2||'_'||v3 order by v3 desc) from   
  (  
    select v2,v3 from   
    (  
      select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from   
      (  
        select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info)   
        union all  
        select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info)   
      ) t  
    ) t where rn=1 order by v3 desc limit ln   
  ) t;  
$$ language sql strict immutable;  
           

5、日志表

create unlogged table tbl_score_log (  -- 流水資料,不計日志,資料庫崩潰會丢失所有記錄  
  item int8 not null,     -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);   
           

6、建立寫入LOG的函數,解決<設計1>的IO放大問題,

create or replace function ins_score_log(  
  i_wid int,   
  i_uid int8,   
  i_item int8,   
  i_score float4   
) returns void as $$  
declare  
begin  
  execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);  
  insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;  
  exception when others then  
    execute format('create unlogged table tbl_score_log_%s_%s (like tbl_score_log including all) inherits (tbl_score_log)', i_wid, i_uid, i_item, i_score);  
    execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);  
    insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;  
end;  
$$ language plpgsql strict;   
           

但是請注意

《PostgreSQL 單庫對象過多,觸發Linux系統限制 (ext4_dx_add_entry: Directory index full!) (could not create file "xx/xx/xxxxxx": No space left on device)》

如果有以上問題,那麼建議按UID或WID切庫,将資料切到不同的庫裡面,避免單個目錄檔案過多。

7、消費LOG

create or replace function consume_log(  
  i_loop int,    -- 循環處理多少次,(多少組wid,uid)  
  i_limit int,   -- 對于同一組wid,uid,單次處理多少行  
  i_topn int     -- 每個wid,uid 次元,保留TOP N個item (score高的前N個)  
) returns void as $$  
declare  
  v_wid int;  
  v_uid int8;  
  v_top1 text[];  
  i int := 0;  
begin  
  LOOP  
  exit when i >= i_loop;   --  loops  
  
  with a as   
  (select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)   
  update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid into v_wid, v_uid;  
  
  execute format ($_$  
  with  
  a as (  
    delete from tbl_score_log_%s_%s where ctid= any (array(  
      select ctid from tbl_score_log_%s_%s order by crt_time limit %s      -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)   
    from  
    (select item,score from a order by score desc limit %s) t    -- limit topn  
  $_$, v_wid, v_uid, v_wid, v_uid, i_limit, i_topn   
  ) into v_top1;    
  
  -- raise notice '%', v_top1;  
    
  if v_top1 is null then  
    continue;  
  end if;  
  
  insert into tbl_score   
  values (v_wid, v_uid, v_top1)   
  on conflict (wid,uid)   
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where   
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);   
   
  i := i+1;  
  END LOOP;  
end;  
$$ language plpgsql strict;   
           

8、寫入壓測

vi test.sql  
\set wid random(1,1000)  
\set uid random(1,10000)  
\set item random(1,100000000)  
select ins_score_log (:wid,:uid::int8,:item::int8,(random()*100)::float4);  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
tps = 146606.220095 (including connections establishing)  
tps = 146614.705007 (excluding connections establishing)  
           

所有分區都建好之後,由于使用了動态SQL,寫入隻有15萬行/s左右。

9、消耗LOG,合并到SCORE表

postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
postgres=# \timing  
Timing is on.  
postgres=# select * from tbl_score limit 10;  
 wid | uid  |                top10                  
-----+------+-------------------------------------  
 115 |   69 | {989915_22.2217}  
 441 | 3914 | {7521898_39.2669}  
 423 | 7048 | {75494665_92.5439}  
 789 | 1335 | {57756208_23.4602}  
 776 | 8065 | {41134454_46.8727}  
 785 | 6248 | {76364646_93.4671,94065193_69.2552}  
 567 | 7539 | {97116865_6.93694}  
 207 | 6926 | {45163995_14.1626}  
 788 | 9025 | {73053901_80.3204}  
 334 | 2805 | {80532634_78.1224}  
(10 rows)  
  
Time: 0.300 ms  
postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
Time: 3677.130 ms (00:03.677)  
           
postgres=# select consume_log(1, 10000, 100);  
LOG:  duration: 0.105 ms  plan:  
Query Text: with a as   
  (select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)   
  update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid  
Update on public.tbl_score_task t  (cost=0.60..2.85 rows=1 width=62) (actual time=0.099..0.100 rows=1 loops=1)  
  Output: t.wid, t.uid  
  Buffers: shared hit=13  
  CTE a  
    ->  Limit  (cost=0.28..0.32 rows=1 width=26) (actual time=0.036..0.036 rows=1 loops=1)  
          Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
          Buffers: shared hit=4  
          ->  LockRows  (cost=0.28..271.41 rows=7057 width=26) (actual time=0.035..0.035 rows=1 loops=1)  
                Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
                Buffers: shared hit=4  
                ->  Index Scan using idx_tbl_score_task_cnt on public.tbl_score_task  (cost=0.28..200.84 rows=7057 width=26) (actual time=0.018..0.018 rows=1 loops=1)  
                      Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
                      Buffers: shared hit=3  
  ->  Nested Loop  (cost=0.28..2.53 rows=1 width=62) (actual time=0.059..0.060 rows=1 loops=1)  
        Output: t.wid, t.uid, (t.cnt + 1), t.ctid, a.*  
        Inner Unique: true  
        Buffers: shared hit=7  
        ->  CTE Scan on a  (cost=0.00..0.02 rows=1 width=48) (actual time=0.046..0.047 rows=1 loops=1)  
              Output: a.*, a.wid, a.uid  
              Buffers: shared hit=4  
        ->  Index Scan using tbl_score_task_pkey on public.tbl_score_task t  (cost=0.28..2.50 rows=1 width=26) (actual time=0.009..0.009 rows=1 loops=1)  
              Output: t.wid, t.uid, t.cnt, t.ctid  
              Index Cond: ((t.wid = a.wid) AND (t.uid = a.uid))  
              Buffers: shared hit=3  
LOG:  duration: 24.624 ms  plan:  
Query Text:   
  with  
  a as (  
    delete from tbl_score_log_3_5 where ctid= any (array(  
      select ctid from tbl_score_log_3_5 order by crt_time limit 10000      -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)   
    from  
    (select item,score from a order by score desc limit 100) t    -- limit topn  
    
Aggregate  (cost=279.53..279.54 rows=1 width=32) (actual time=24.619..24.619 rows=1 loops=1)  
  Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)  
  Buffers: shared hit=39297  
  CTE a  
    ->  Delete on public.tbl_score_log_3_5 tbl_score_log_3_5_1  (cost=267.76..278.86 rows=10 width=6) (actual time=10.193..19.993 rows=10000 loops=1)  
          Output: tbl_score_log_3_5_1.item, tbl_score_log_3_5_1.score  
          Buffers: shared hit=39297  
          InitPlan 1 (returns $0)  
            ->  Limit  (cost=0.42..267.75 rows=10000 width=14) (actual time=0.017..7.185 rows=10000 loops=1)  
                  Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time  
                  Buffers: shared hit=9297  
                  ->  Index Scan using tbl_score_log_3_5_crt_time_idx on public.tbl_score_log_3_5  (cost=0.42..3907.05 rows=146135 width=14) (actual time=0.016..5.319 rows=10000 loops=1)  
                        Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time  
                        Buffers: shared hit=9297  
          ->  Tid Scan on public.tbl_score_log_3_5 tbl_score_log_3_5_1  (cost=0.01..11.11 rows=10 width=6) (actual time=10.188..13.238 rows=10000 loops=1)  
                Output: tbl_score_log_3_5_1.ctid  
                TID Cond: (tbl_score_log_3_5_1.ctid = ANY ($0))  
                Buffers: shared hit=19297  
  ->  Limit  (cost=0.37..0.39 rows=10 width=12) (actual time=24.433..24.461 rows=100 loops=1)  
        Output: a.item, a.score  
        Buffers: shared hit=39297  
        ->  Sort  (cost=0.37..0.39 rows=10 width=12) (actual time=24.432..24.443 rows=100 loops=1)  
              Output: a.item, a.score  
              Sort Key: a.score DESC  
              Sort Method: top-N heapsort  Memory: 32kB  
              Buffers: shared hit=39297  
              ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=12) (actual time=10.195..22.790 rows=10000 loops=1)  
                    Output: a.item, a.score  
                    Buffers: shared hit=39297  
LOG:  duration: 0.084 ms  plan:  
Query Text: insert into tbl_score   
  values (v_wid, v_uid, v_top1)   
  on conflict (wid,uid)   
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where   
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  
Insert on public.tbl_score  (cost=0.00..0.01 rows=1 width=44) (actual time=0.083..0.083 rows=0 loops=1)  
  Conflict Resolution: UPDATE  
  Conflict Arbiter Indexes: tbl_score_pkey  
  Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))  
  Tuples Inserted: 1  
  Conflicting Tuples: 0  
  Buffers: shared hit=4  
  ->  Result  (cost=0.00..0.01 rows=1 width=44) (actual time=0.001..0.001 rows=1 loops=1)  
        Output: $5, $6, $7  
LOG:  duration: 26.335 ms  plan:  
Query Text: select consume_log(1, 10000, 100);  
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=26.329..26.329 rows=1 loops=1)  
  Output: consume_log(1, 10000, 100)  
  Buffers: shared hit=39388  
 consume_log   
-------------  
   
(1 row)  
  
Time: 26.937 ms  
           

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A13 設計3

與設計1類似,隻是在前面再加一個離散寫入表,定期對離散表排序後寫入tbl_score_log表,再從tbl_score_log消費(與設計1保持一緻),解決IO放大問題。

使用AB表切換:

create unlogged table tbl_score_log_a (  
  wid int not null,   -- 次元ID  
  uid int8 not null,  -- ToB 店鋪ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create unlogged table tbl_score_log_b (  
  wid int not null,   -- 次元ID  
  uid int8 not null,  -- ToB 店鋪ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
           

例如堆積了2000萬記錄後,排序寫入tbl_score_log

begin;  
lock table tbl_score_log_a in ACCESS EXCLUSIVE mode;   
insert into tbl_score_log select * from tbl_score_log_a order by wid,uid,crt_time;  
truncate tbl_score_log_a;  
end;  
           

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A14 設計4

與設計1類似,隻是每次計算的是多個次元而不是一個次元。

單次計算多個次元的TOP-K,參考這種方法:

《PostgreSQL 遞歸妙用案例 - 分組資料去重與打散》

設計1采用每個次元計算一次的方法,如果使用設計1,那麼會導緻IO放大,而如果使用單次計算多個次元的方法,IO放大的問題就沒了。(但是建議這種方法單次計算更大量的資料(比如一次計算1000萬條),否則可能造成tbl_score更新頻次過多的問題(單個次元多次消耗,多次更新))

與設計1不同的設計之處如下:

create unlogged table tbl_score_log (    
  wid int not null,   -- 次元ID    
  uid int8 not null,  -- ToB 店鋪ID    
  item int8 not null, -- 商品ID    
  score float4 not null,  -- 打分    
  crt_time timestamp not null     
);     
    
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);    
           
create or replace function consume_log(    
  i_limit int,   -- 單次處理多少行    
  i_topn int     -- 每個wid,uid 次元,保留TOP N個item (score高的前N個)    
) returns void as $$    
declare    
begin    
      
  with    
  a as (    
    delete from tbl_score_log where ctid= any (array(    
      select ctid from tbl_score_log order by crt_time limit i_limit     -- limit batch    
    )) returning wid,uid,item,score    
  )    
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t   
      where rn <= i_topn  -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)     -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  -- limit topn 
  ;  
end;    
$$ language plpgsql strict;    
           
select consume_log(10000000,100);  
           

或者可以直接使用如下SQL來進行消費例如

with    
  a as (    
    delete from tbl_score_log where ctid= any (array(    
      select ctid from tbl_score_log order by crt_time limit 10000000     -- limit batch    
    )) returning wid,uid,item,score    
  )    
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t   
      where rn <= 100  -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100)     -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100)  -- limit topn 
  ;  
           
QUERY PLAN                                                                                      
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Insert on public.tbl_score  (cost=36744.69..36745.17 rows=3 width=44) (actual time=69966.565..69966.565 rows=0 loops=1)
   Conflict Resolution: UPDATE
   Conflict Arbiter Indexes: tbl_score_pkey
   Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, 100))
   Tuples Inserted: 317084
   Conflicting Tuples: 634683
   Buffers: shared hit=13811948 read=7001 dirtied=7001
   CTE a
     ->  Delete on public.tbl_score_log tbl_score_log_1  (cost=36733.22..36744.32 rows=10 width=6) (actual time=968.724..1891.686 rows=1000000 loops=1)
           Output: tbl_score_log_1.wid, tbl_score_log_1.uid, tbl_score_log_1.item, tbl_score_log_1.score
           Buffers: shared hit=4007463
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.43..36733.21 rows=1000000 width=14) (actual time=0.011..660.528 rows=1000000 loops=1)
                   Output: tbl_score_log.ctid, tbl_score_log.crt_time
                   Buffers: shared hit=999099
                   ->  Index Scan using idx_tbl_score_log_1 on public.tbl_score_log  (cost=0.43..427926.61 rows=11649711 width=14) (actual time=0.010..494.951 rows=1000000 loops=1)
                         Output: tbl_score_log.ctid, tbl_score_log.crt_time
                         Buffers: shared hit=999099
           ->  Tid Scan on public.tbl_score_log tbl_score_log_1  (cost=0.01..11.11 rows=10 width=6) (actual time=968.673..1265.722 rows=1000000 loops=1)
                 Output: tbl_score_log_1.ctid
                 TID Cond: (tbl_score_log_1.ctid = ANY ($0))
                 Buffers: shared hit=1999099
   ->  GroupAggregate  (cost=0.37..0.82 rows=3 width=44) (actual time=2907.640..8707.867 rows=951767 loops=1)
         Output: t.wid, t.uid, array_agg((((t.item)::text || '_'::text) || (t.score)::text) ORDER BY t.score DESC)
         Group Key: t.wid, t.uid
         Buffers: shared hit=4007463
         ->  Subquery Scan on t  (cost=0.37..0.72 rows=3 width=24) (actual time=2907.590..4711.497 rows=1000000 loops=1)
               Output: t.wid, t.uid, t.item, t.score, t.rn
               Filter: (t.rn <= 100)
               Buffers: shared hit=4007463
               ->  WindowAgg  (cost=0.37..0.59 rows=10 width=32) (actual time=2907.588..4395.127 rows=1000000 loops=1)
                     Output: a.wid, a.uid, a.item, a.score, row_number() OVER (?)
                     Buffers: shared hit=4007463
                     ->  Sort  (cost=0.37..0.39 rows=10 width=24) (actual time=2907.575..3283.649 rows=1000000 loops=1)
                           Output: a.wid, a.uid, a.score, a.item
                           Sort Key: a.wid, a.uid, a.score DESC
                           Sort Method: quicksort  Memory: 102702kB
                           Buffers: shared hit=4007463
                           ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=24) (actual time=968.728..2201.439 rows=1000000 loops=1)
                                 Output: a.wid, a.uid, a.score, a.item
                                 Buffers: shared hit=4007463
 Planning time: 0.623 ms
 Execution time: 69990.738 ms
(43 rows)
           

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A15 設計5

與設計4類似,隻是我們不使用delete tbl_score_log的方式來消耗,而是将tbl_score_log使用分區表或類似AB表的方式,一次消耗一整張表。那麼就不需要delete了,而是算完直接truncate.

begin;
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by wid,uid,score desc) as rn from tbl_score_log_a) t   -- AB表切換的方式
      where rn <= 100                      -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100)       -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100)     -- limit topn 
  ;  

  truncate tbl_score_log_a;
end;  
           

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E5%B0%8F%E7%BB%93 小結

1、使用預排的方法,使得查詢響應得到保障,單個RDS PG執行個體可以做到45萬的tps。

2、初始資料生成,可以從OSS導入(在HDB PG或ODPS中計算好,生成初始資料,寫入OSS)。使用并行導入,可以加快導入速度,參考如下:

《阿裡雲RDS PostgreSQL OSS 外部表 - (dblink異步調用封裝)并行寫提速案例》

3、增量資料,通過記日志的形式寫入RDS PG,在RDS PG中排程消費日志,合并到最終的tbl_score表。

增量(新增、删除、更新):

删除,設定SCORE=0

更新,UDF已包含(覆寫)。

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E5%85%B6%E4%BB%96%E6%80%9D%E8%80%83 其他思考

1、考慮引入機率計算?

《PostgreSQL count-min sketch top-n 機率計算插件 cms_topn (結合視窗實作同比、環比、滑窗分析等) - 流計算核心功能之一》

2、單次計算多個次元的TOP-K,參考這種方法:

目前采用每個次元計算一次的方法,如果使用設計1,那麼會導緻IO放大,而如果使用單次計算多個次元的方法,IO放大的問題就沒了。(但是建議這種方法單次計算更大量的資料(比如一次計算1000萬條),否則可能造成tbl_score更新頻次過多的問題(單個次元多次消耗,多次更新))

https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E5%8F%82%E8%80%83 參考