标簽
PostgreSQL , gin , 倒排 , rum , gin_fuzzy_search_limit , 随機采樣 , 分區索引 , 分段索引 , score分段
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%83%8C%E6%99%AF 背景
店鋪,廣告推薦位,自動計算,高效檢索,高效更新。
根據:本店、全網使用者行為,庫存等進行運算,得到每個商品的分值,推薦排行靠前的商品。
次元可能很多,例如:北京的男性使用者在秋天買襪子的可能性是0.431,這裡面就是4個次元。實際場景次元可能有幾十個,幾百個,甚至幾千個。
需要支援任意次元,排序,求TOP 100,要求毫秒級延遲,100萬QPS。
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A11 設計1
1、定義次元
create table tbl_weidu (
wid int primary key,
info json
);
2、定義推薦表,隻存儲排在前100的商品和分值
create table tbl_score (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
top10 text[] not null, -- top 10的 item_score
primary key(wid,uid)
);
3、定義一個函數,用于合并兩個text數組,在有新的商品分值輸入時,合并為一個新值(當商品重複時,新值覆寫舊值,最後排序,保留輸出TOP N)
create or replace function merge_top10(
text[], -- old value
text[], -- new value
ln int -- 按score排序,保留 top N
) returns text[] as $$
select array_agg(v2||'_'||v3 order by v3 desc) from
(
select v2,v3 from
(
select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from -- 同一個商品, 使用new values
(
select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info) -- old values
union all
select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info) -- new values
) t
) t where rn=1 order by v3 desc limit ln -- 同一個商品, 使用new values
) t;
$$ language sql strict immutable;
4、定義日志表,用于記錄商品在某個次元上的值的變更,後面消費這個LOG表,合并更新最後的tbl_score表
create unlogged table tbl_score_log (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create index idx_tbl_score_log_1 on tbl_score_log (wid,uid,crt_time);
5、定義消費函數
create or replace function consume_log(
i_loop int, -- 循環處理多少次,(多少組wid,uid)
i_limit int, -- 對于同一組wid,uid,單次處理多少行
i_topn int -- 每個wid,uid 次元,保留TOP N個item (score高的前N個)
) returns void as $$
declare
v_wid int;
v_uid int8;
v_top1 text[];
i int := 0;
begin
LOOP
exit when i >= i_loop; -- loops
select wid,uid into v_wid,v_uid from tbl_score_log for update skip locked limit 1;
with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc) into v_top1
from
(select item,score from a order by score desc limit i_topn) t; -- limit topn
insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);
i := i+1;
END LOOP;
end;
$$ language plpgsql strict;
6、壓測1,生成分值變更日志
(1000個次元,1萬家店,1億個商品)
vi test.sql
\set wid random(1,1000)
\set uid random(1,10000)
\set item random(1,100000000)
insert into tbl_score_log values (:wid,:uid,:item,random()*100,now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
tps = 257737.493753 (including connections establishing)
tps = 257752.428348 (excluding connections establishing)
寫入超過25萬行/s.
8、消費LOG表,合并結果到分值表
postgres=# select consume_log(10, 10000, 100);
consume_log
-------------
(1 row)
postgres=# \timing
Timing is on.
postgres=# select * from tbl_score limit 10;
wid | uid | top10
-----+------+-------------------------------------
115 | 69 | {989915_22.2217}
441 | 3914 | {7521898_39.2669}
423 | 7048 | {75494665_92.5439}
789 | 1335 | {57756208_23.4602}
776 | 8065 | {41134454_46.8727}
785 | 6248 | {76364646_93.4671,94065193_69.2552}
567 | 7539 | {97116865_6.93694}
207 | 6926 | {45163995_14.1626}
788 | 9025 | {73053901_80.3204}
334 | 2805 | {80532634_78.1224}
(10 rows)
Time: 0.300 ms
9、跟蹤每一次消費消耗的資源
load 'auto_explain';
set auto_explain.log_analyze =on;
set auto_explain.log_buffers =on;
set auto_explain.log_min_duration =0;
set auto_explain.log_nested_statements =on;
set auto_explain.log_time=on;
set auto_explain.log_verbose =on;
set client_min_messages ='log';
postgres=# select consume_log(1, 10000, 100);
LOG: duration: 0.819 ms plan:
Query Text: select wid,uid from tbl_score_log for update skip locked limit 1
Limit (cost=10000000000.00..10000000000.03 rows=1 width=18) (actual time=0.816..0.816 rows=1 loops=1)
Output: wid, uid, ctid
Buffers: shared hit=177
-> LockRows (cost=10000000000.00..10000876856.44 rows=30947272 width=18) (actual time=0.815..0.815 rows=1 loops=1)
Output: wid, uid, ctid
Buffers: shared hit=177
-> Seq Scan on public.tbl_score_log (cost=10000000000.00..10000567383.72 rows=30947272 width=18) (actual time=0.808..0.808 rows=1 loops=1)
Output: wid, uid, ctid
Buffers: shared hit=176
LOG: duration: 0.104 ms plan:
Query Text: with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc) from
(select item,score from a order by score desc limit i_topn) t
Aggregate (cost=13.56..13.57 rows=1 width=32) (actual time=0.100..0.100 rows=1 loops=1)
Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)
Buffers: shared hit=20
CTE a
-> Delete on public.tbl_score_log tbl_score_log_1 (cost=2.06..13.16 rows=10 width=6) (actual time=0.059..0.063 rows=4 loops=1)
Output: tbl_score_log_1.item, tbl_score_log_1.score
Buffers: shared hit=20
InitPlan 1 (returns $0)
-> Limit (cost=0.56..2.05 rows=1 width=14) (actual time=0.017..0.043 rows=4 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Buffers: shared hit=8
-> Index Scan using idx_tbl_score_log_1 on public.tbl_score_log (cost=0.56..5.02 rows=3 width=14) (actual time=0.017..0.041 rows=4 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Index Cond: ((tbl_score_log.wid = $5) AND (tbl_score_log.uid = $6))
Buffers: shared hit=8
-> Tid Scan on public.tbl_score_log tbl_score_log_1 (cost=0.01..11.11 rows=10 width=6) (actual time=0.053..0.055 rows=4 loops=1)
Output: tbl_score_log_1.ctid
TID Cond: (tbl_score_log_1.ctid = ANY ($0))
Buffers: shared hit=12
-> Limit (cost=0.37..0.37 rows=1 width=12) (actual time=0.077..0.079 rows=4 loops=1)
Output: a.item, a.score
Buffers: shared hit=20
-> Sort (cost=0.37..0.39 rows=10 width=12) (actual time=0.076..0.077 rows=4 loops=1)
Output: a.item, a.score
Sort Key: a.score DESC
Sort Method: quicksort Memory: 25kB
Buffers: shared hit=20
-> CTE Scan on a (cost=0.00..0.20 rows=10 width=12) (actual time=0.060..0.066 rows=4 loops=1)
Output: a.item, a.score
Buffers: shared hit=20
LOG: duration: 0.046 ms plan:
Query Text: insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)
Insert on public.tbl_score (cost=0.00..0.01 rows=1 width=44) (actual time=0.045..0.045 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_score_pkey
Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))
Tuples Inserted: 1
Conflicting Tuples: 0
Buffers: shared hit=7
-> Result (cost=0.00..0.01 rows=1 width=44) (actual time=0.000..0.001 rows=1 loops=1)
Output: $5, $6, $7
LOG: duration: 1.951 ms plan:
Query Text: select consume_log(1, 10000, 100);
Result (cost=0.00..0.26 rows=1 width=4) (actual time=1.944..1.944 rows=1 loops=1)
Output: consume_log(1, 10000, 100)
Buffers: shared hit=212
consume_log
-------------
(1 row)
Time: 2.390 ms
消耗1萬個名額,約1.5秒。
10、壓測2,查詢某個次元,某個店鋪的廣告位推薦
vi test1.sql
\set wid random(1,1000)
\set uid random(1,10000)
select * from tbl_score where wid=:wid and uid=:uid;
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 32 -j 32 -T 120
tps = 470514.018510 (including connections establishing)
tps = 470542.672975 (excluding connections establishing)
查詢速度可以達到 45萬 qps.
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A12 設計2
設計1的一個可以優化的點,在寫入tbl_score_log時,如果不同次元的資料夾雜在一起輸入,在消費時會引入IO放大的問題:
《PostgreSQL 時序最佳實踐 - 證券交易系統資料庫設計 - 阿裡雲RDS PostgreSQL最佳實踐》我們可以使用以上同樣的方法來對次元資料分區存放,消費時也按分區消費。
1、建立次元描述表
create table tbl_weidu (
wid int primary key,
info json
);
2、建立TOP-K分值表
create table tbl_score (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
top10 text[] not null, -- top 10的item_score
primary key(wid,uid)
);
3、建立任務表,記錄每次消耗LOG時的計數,每個次元一個計數器
create table tbl_score_task (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
cnt int8 default 0, -- 被計算次數
primary key(wid,uid)
);
create index idx_tbl_score_task_cnt on tbl_score_task (cnt);
4、合并兩個TEXT數組的函數
create or replace function merge_top10(
text[], -- old value
text[], -- new value
ln int -- 按score排序,保留 top N
) returns text[] as $$
select array_agg(v2||'_'||v3 order by v3 desc) from
(
select v2,v3 from
(
select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from
(
select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info)
union all
select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info)
) t
) t where rn=1 order by v3 desc limit ln
) t;
$$ language sql strict immutable;
5、日志表
create unlogged table tbl_score_log ( -- 流水資料,不計日志,資料庫崩潰會丢失所有記錄
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);
6、建立寫入LOG的函數,解決<設計1>的IO放大問題,
create or replace function ins_score_log(
i_wid int,
i_uid int8,
i_item int8,
i_score float4
) returns void as $$
declare
begin
execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);
insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;
exception when others then
execute format('create unlogged table tbl_score_log_%s_%s (like tbl_score_log including all) inherits (tbl_score_log)', i_wid, i_uid, i_item, i_score);
execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);
insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;
end;
$$ language plpgsql strict;
但是請注意
《PostgreSQL 單庫對象過多,觸發Linux系統限制 (ext4_dx_add_entry: Directory index full!) (could not create file "xx/xx/xxxxxx": No space left on device)》如果有以上問題,那麼建議按UID或WID切庫,将資料切到不同的庫裡面,避免單個目錄檔案過多。
7、消費LOG
create or replace function consume_log(
i_loop int, -- 循環處理多少次,(多少組wid,uid)
i_limit int, -- 對于同一組wid,uid,單次處理多少行
i_topn int -- 每個wid,uid 次元,保留TOP N個item (score高的前N個)
) returns void as $$
declare
v_wid int;
v_uid int8;
v_top1 text[];
i int := 0;
begin
LOOP
exit when i >= i_loop; -- loops
with a as
(select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)
update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid into v_wid, v_uid;
execute format ($_$
with
a as (
delete from tbl_score_log_%s_%s where ctid= any (array(
select ctid from tbl_score_log_%s_%s order by crt_time limit %s -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc)
from
(select item,score from a order by score desc limit %s) t -- limit topn
$_$, v_wid, v_uid, v_wid, v_uid, i_limit, i_topn
) into v_top1;
-- raise notice '%', v_top1;
if v_top1 is null then
continue;
end if;
insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);
i := i+1;
END LOOP;
end;
$$ language plpgsql strict;
8、寫入壓測
vi test.sql
\set wid random(1,1000)
\set uid random(1,10000)
\set item random(1,100000000)
select ins_score_log (:wid,:uid::int8,:item::int8,(random()*100)::float4);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
tps = 146606.220095 (including connections establishing)
tps = 146614.705007 (excluding connections establishing)
所有分區都建好之後,由于使用了動态SQL,寫入隻有15萬行/s左右。
9、消耗LOG,合并到SCORE表
postgres=# select consume_log(10, 10000, 100);
consume_log
-------------
(1 row)
postgres=# \timing
Timing is on.
postgres=# select * from tbl_score limit 10;
wid | uid | top10
-----+------+-------------------------------------
115 | 69 | {989915_22.2217}
441 | 3914 | {7521898_39.2669}
423 | 7048 | {75494665_92.5439}
789 | 1335 | {57756208_23.4602}
776 | 8065 | {41134454_46.8727}
785 | 6248 | {76364646_93.4671,94065193_69.2552}
567 | 7539 | {97116865_6.93694}
207 | 6926 | {45163995_14.1626}
788 | 9025 | {73053901_80.3204}
334 | 2805 | {80532634_78.1224}
(10 rows)
Time: 0.300 ms
postgres=# select consume_log(10, 10000, 100);
consume_log
-------------
(1 row)
Time: 3677.130 ms (00:03.677)
postgres=# select consume_log(1, 10000, 100);
LOG: duration: 0.105 ms plan:
Query Text: with a as
(select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)
update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid
Update on public.tbl_score_task t (cost=0.60..2.85 rows=1 width=62) (actual time=0.099..0.100 rows=1 loops=1)
Output: t.wid, t.uid
Buffers: shared hit=13
CTE a
-> Limit (cost=0.28..0.32 rows=1 width=26) (actual time=0.036..0.036 rows=1 loops=1)
Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid
Buffers: shared hit=4
-> LockRows (cost=0.28..271.41 rows=7057 width=26) (actual time=0.035..0.035 rows=1 loops=1)
Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid
Buffers: shared hit=4
-> Index Scan using idx_tbl_score_task_cnt on public.tbl_score_task (cost=0.28..200.84 rows=7057 width=26) (actual time=0.018..0.018 rows=1 loops=1)
Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid
Buffers: shared hit=3
-> Nested Loop (cost=0.28..2.53 rows=1 width=62) (actual time=0.059..0.060 rows=1 loops=1)
Output: t.wid, t.uid, (t.cnt + 1), t.ctid, a.*
Inner Unique: true
Buffers: shared hit=7
-> CTE Scan on a (cost=0.00..0.02 rows=1 width=48) (actual time=0.046..0.047 rows=1 loops=1)
Output: a.*, a.wid, a.uid
Buffers: shared hit=4
-> Index Scan using tbl_score_task_pkey on public.tbl_score_task t (cost=0.28..2.50 rows=1 width=26) (actual time=0.009..0.009 rows=1 loops=1)
Output: t.wid, t.uid, t.cnt, t.ctid
Index Cond: ((t.wid = a.wid) AND (t.uid = a.uid))
Buffers: shared hit=3
LOG: duration: 24.624 ms plan:
Query Text:
with
a as (
delete from tbl_score_log_3_5 where ctid= any (array(
select ctid from tbl_score_log_3_5 order by crt_time limit 10000 -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc)
from
(select item,score from a order by score desc limit 100) t -- limit topn
Aggregate (cost=279.53..279.54 rows=1 width=32) (actual time=24.619..24.619 rows=1 loops=1)
Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)
Buffers: shared hit=39297
CTE a
-> Delete on public.tbl_score_log_3_5 tbl_score_log_3_5_1 (cost=267.76..278.86 rows=10 width=6) (actual time=10.193..19.993 rows=10000 loops=1)
Output: tbl_score_log_3_5_1.item, tbl_score_log_3_5_1.score
Buffers: shared hit=39297
InitPlan 1 (returns $0)
-> Limit (cost=0.42..267.75 rows=10000 width=14) (actual time=0.017..7.185 rows=10000 loops=1)
Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time
Buffers: shared hit=9297
-> Index Scan using tbl_score_log_3_5_crt_time_idx on public.tbl_score_log_3_5 (cost=0.42..3907.05 rows=146135 width=14) (actual time=0.016..5.319 rows=10000 loops=1)
Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time
Buffers: shared hit=9297
-> Tid Scan on public.tbl_score_log_3_5 tbl_score_log_3_5_1 (cost=0.01..11.11 rows=10 width=6) (actual time=10.188..13.238 rows=10000 loops=1)
Output: tbl_score_log_3_5_1.ctid
TID Cond: (tbl_score_log_3_5_1.ctid = ANY ($0))
Buffers: shared hit=19297
-> Limit (cost=0.37..0.39 rows=10 width=12) (actual time=24.433..24.461 rows=100 loops=1)
Output: a.item, a.score
Buffers: shared hit=39297
-> Sort (cost=0.37..0.39 rows=10 width=12) (actual time=24.432..24.443 rows=100 loops=1)
Output: a.item, a.score
Sort Key: a.score DESC
Sort Method: top-N heapsort Memory: 32kB
Buffers: shared hit=39297
-> CTE Scan on a (cost=0.00..0.20 rows=10 width=12) (actual time=10.195..22.790 rows=10000 loops=1)
Output: a.item, a.score
Buffers: shared hit=39297
LOG: duration: 0.084 ms plan:
Query Text: insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)
Insert on public.tbl_score (cost=0.00..0.01 rows=1 width=44) (actual time=0.083..0.083 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_score_pkey
Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))
Tuples Inserted: 1
Conflicting Tuples: 0
Buffers: shared hit=4
-> Result (cost=0.00..0.01 rows=1 width=44) (actual time=0.001..0.001 rows=1 loops=1)
Output: $5, $6, $7
LOG: duration: 26.335 ms plan:
Query Text: select consume_log(1, 10000, 100);
Result (cost=0.00..0.26 rows=1 width=4) (actual time=26.329..26.329 rows=1 loops=1)
Output: consume_log(1, 10000, 100)
Buffers: shared hit=39388
consume_log
-------------
(1 row)
Time: 26.937 ms
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A13 設計3
與設計1類似,隻是在前面再加一個離散寫入表,定期對離散表排序後寫入tbl_score_log表,再從tbl_score_log消費(與設計1保持一緻),解決IO放大問題。
使用AB表切換:
create unlogged table tbl_score_log_a (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create unlogged table tbl_score_log_b (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
例如堆積了2000萬記錄後,排序寫入tbl_score_log
begin;
lock table tbl_score_log_a in ACCESS EXCLUSIVE mode;
insert into tbl_score_log select * from tbl_score_log_a order by wid,uid,crt_time;
truncate tbl_score_log_a;
end;
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A14 設計4
與設計1類似,隻是每次計算的是多個次元而不是一個次元。
單次計算多個次元的TOP-K,參考這種方法:
《PostgreSQL 遞歸妙用案例 - 分組資料去重與打散》設計1采用每個次元計算一次的方法,如果使用設計1,那麼會導緻IO放大,而如果使用單次計算多個次元的方法,IO放大的問題就沒了。(但是建議這種方法單次計算更大量的資料(比如一次計算1000萬條),否則可能造成tbl_score更新頻次過多的問題(單個次元多次消耗,多次更新))
與設計1不同的設計之處如下:
create unlogged table tbl_score_log (
wid int not null, -- 次元ID
uid int8 not null, -- ToB 店鋪ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);
create or replace function consume_log(
i_limit int, -- 單次處理多少行
i_topn int -- 每個wid,uid 次元,保留TOP N個item (score高的前N個)
) returns void as $$
declare
begin
with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log order by crt_time limit i_limit -- limit batch
)) returning wid,uid,item,score
)
insert into tbl_score
select wid,uid,topn
from
(
select
wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn
from
(
select wid,uid,item,score from
(select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t
where rn <= i_topn -- limit topn
) t
group by wid,uid
) t
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn) -- limit topn
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn) -- limit topn
;
end;
$$ language plpgsql strict;
select consume_log(10000000,100);
或者可以直接使用如下SQL來進行消費例如
with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log order by crt_time limit 10000000 -- limit batch
)) returning wid,uid,item,score
)
insert into tbl_score
select wid,uid,topn
from
(
select
wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn
from
(
select wid,uid,item,score from
(select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t
where rn <= 100 -- limit topn
) t
group by wid,uid
) t
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.tbl_score (cost=36744.69..36745.17 rows=3 width=44) (actual time=69966.565..69966.565 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_score_pkey
Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, 100))
Tuples Inserted: 317084
Conflicting Tuples: 634683
Buffers: shared hit=13811948 read=7001 dirtied=7001
CTE a
-> Delete on public.tbl_score_log tbl_score_log_1 (cost=36733.22..36744.32 rows=10 width=6) (actual time=968.724..1891.686 rows=1000000 loops=1)
Output: tbl_score_log_1.wid, tbl_score_log_1.uid, tbl_score_log_1.item, tbl_score_log_1.score
Buffers: shared hit=4007463
InitPlan 1 (returns $0)
-> Limit (cost=0.43..36733.21 rows=1000000 width=14) (actual time=0.011..660.528 rows=1000000 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Buffers: shared hit=999099
-> Index Scan using idx_tbl_score_log_1 on public.tbl_score_log (cost=0.43..427926.61 rows=11649711 width=14) (actual time=0.010..494.951 rows=1000000 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Buffers: shared hit=999099
-> Tid Scan on public.tbl_score_log tbl_score_log_1 (cost=0.01..11.11 rows=10 width=6) (actual time=968.673..1265.722 rows=1000000 loops=1)
Output: tbl_score_log_1.ctid
TID Cond: (tbl_score_log_1.ctid = ANY ($0))
Buffers: shared hit=1999099
-> GroupAggregate (cost=0.37..0.82 rows=3 width=44) (actual time=2907.640..8707.867 rows=951767 loops=1)
Output: t.wid, t.uid, array_agg((((t.item)::text || '_'::text) || (t.score)::text) ORDER BY t.score DESC)
Group Key: t.wid, t.uid
Buffers: shared hit=4007463
-> Subquery Scan on t (cost=0.37..0.72 rows=3 width=24) (actual time=2907.590..4711.497 rows=1000000 loops=1)
Output: t.wid, t.uid, t.item, t.score, t.rn
Filter: (t.rn <= 100)
Buffers: shared hit=4007463
-> WindowAgg (cost=0.37..0.59 rows=10 width=32) (actual time=2907.588..4395.127 rows=1000000 loops=1)
Output: a.wid, a.uid, a.item, a.score, row_number() OVER (?)
Buffers: shared hit=4007463
-> Sort (cost=0.37..0.39 rows=10 width=24) (actual time=2907.575..3283.649 rows=1000000 loops=1)
Output: a.wid, a.uid, a.score, a.item
Sort Key: a.wid, a.uid, a.score DESC
Sort Method: quicksort Memory: 102702kB
Buffers: shared hit=4007463
-> CTE Scan on a (cost=0.00..0.20 rows=10 width=24) (actual time=968.728..2201.439 rows=1000000 loops=1)
Output: a.wid, a.uid, a.score, a.item
Buffers: shared hit=4007463
Planning time: 0.623 ms
Execution time: 69990.738 ms
(43 rows)
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E8%AE%BE%E8%AE%A15 設計5
與設計4類似,隻是我們不使用delete tbl_score_log的方式來消耗,而是将tbl_score_log使用分區表或類似AB表的方式,一次消耗一整張表。那麼就不需要delete了,而是算完直接truncate.
begin;
insert into tbl_score
select wid,uid,topn
from
(
select
wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn
from
(
select wid,uid,item,score from
(select wid,uid,item,score,row_number() over (partition by wid,uid order by wid,uid,score desc) as rn from tbl_score_log_a) t -- AB表切換的方式
where rn <= 100 -- limit topn
) t
group by wid,uid
) t
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
;
truncate tbl_score_log_a;
end;
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E5%B0%8F%E7%BB%93 小結
1、使用預排的方法,使得查詢響應得到保障,單個RDS PG執行個體可以做到45萬的tps。
2、初始資料生成,可以從OSS導入(在HDB PG或ODPS中計算好,生成初始資料,寫入OSS)。使用并行導入,可以加快導入速度,參考如下:
《阿裡雲RDS PostgreSQL OSS 外部表 - (dblink異步調用封裝)并行寫提速案例》3、增量資料,通過記日志的形式寫入RDS PG,在RDS PG中排程消費日志,合并到最終的tbl_score表。
增量(新增、删除、更新):
删除,設定SCORE=0
更新,UDF已包含(覆寫)。
https://github.com/digoal/blog/blob/master/201804/20180420_03.md#%E5%85%B6%E4%BB%96%E6%80%9D%E8%80%83 其他思考
1、考慮引入機率計算?
《PostgreSQL count-min sketch top-n 機率計算插件 cms_topn (結合視窗實作同比、環比、滑窗分析等) - 流計算核心功能之一》2、單次計算多個次元的TOP-K,參考這種方法:
目前采用每個次元計算一次的方法,如果使用設計1,那麼會導緻IO放大,而如果使用單次計算多個次元的方法,IO放大的問題就沒了。(但是建議這種方法單次計算更大量的資料(比如一次計算1000萬條),否則可能造成tbl_score更新頻次過多的問題(單個次元多次消耗,多次更新))