标簽
PostgreSQL , 背景任務 , DBLINK 異步調用
https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E8%83%8C%E6%99%AF 背景
使用DBLINK異步接口,可以非常友善的實作跑背景任務,如果要讓資料庫執行若幹條SQL,開N個并行執行,同樣可以使用DBLINK封裝成API進行調用。
例如,結合我前面的一些文字,可以實作自動選擇索引接口、指定并行度、指定表空間、給所有字段建立索引。
《自動選擇正确索引通路接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》 《PostgreSQL 快速給指定表每個字段建立索引》 《阿裡雲RDS PostgreSQL OSS 外部表實踐 - (dblink異步調用封裝并行) 從OSS并行導入資料》 《在PostgreSQL中跑背景長任務的方法 - 使用dblink異步接口》https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%B9%B6%E8%A1%8C%E5%90%8E%E5%8F%B0%E4%BB%BB%E5%8A%A1%E6%8E%A5%E5%8F%A3%E5%AE%9E%E7%8E%B0 并行背景任務接口實作
接口效果:
select run_sqls_parallel (
參數1:并行度,
參數2:要執行的SQLs(數組呈現)
);
https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%AE%9E%E7%8E%B0 實作
1、建立dblink插件
create extension if not exists dblink;
2、建立一個建立連接配接函數,不報錯
create or replace function conn(
name, -- dblink名字
text -- 連接配接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
3、建立跑多任務的接口函數
create or replace function run_sqls_parallel(
parallels int, -- 并行度
sqls text[], -- 需要執行的SQLs
conn text default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database()) -- 連接配接串
)
returns setof record as $$
declare
app_prefix_stat text := md5(random()::text); -- 用來擷取pg_stat_activity的實時内容 (由于pg_stat_activity的函數是stable的,無法在事務中擷取到被其他會話變更的内容)
app_prefix text := md5(random()::text); -- application, dblink name prefix
i int := 1; -- 任務ID變量,1累加
app_conn_name text; -- application_name, dblink conn name = app_prefix+i
sql text; -- SQL 元素
current_conns int := 0; -- 目前活躍的異步調用
begin
-- 建立擷取實時pg_stat_activity内容連接配接
perform conn(app_prefix_stat, conn||app_prefix_stat);
foreach sql in array sqls
loop
-- 目前是否有空閑異步連接配接
select application_name into app_conn_name from
dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))
as t(application_name text);
-- 有空閑異步連接配接
if found then
-- 消耗掉上一次異步連接配接的結果,否則會報錯。
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
-- 發送異步DBLINK調用
perform dblink_send_query(app_conn_name, sql);
-- 無空閑異步連接配接
else
-- 目前已建立的異步連接配接數
select cn into current_conns from
dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))
as t(cn int);
loop
-- 達到并行度
if current_conns >= parallels then
-- 是否有空閑異步連接配接
select application_name into app_conn_name from
dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))
as t(application_name text);
-- 有
if found then
-- 消耗掉上一次異步連接配接的結果,否則會報錯。
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
-- 發送異步DBLINK調用
perform dblink_send_query(app_conn_name, sql);
-- 退出循環
exit;
-- 沒有,等
else
perform pg_sleep(1);
raise notice 'current running tasks: %, waiting idle conns.', current_conns;
end if;
-- 未達到并行度
else
-- 建立連接配接
perform conn(app_prefix||i, conn||app_prefix||i); -- 建立連接配接。
-- 發送異步DBLINK調用
perform dblink_send_query(app_prefix||i, sql);
-- 連接配接suffix序号 遞增
i := i+1;
-- 退出循環
exit;
end if;
end loop;
end if;
end loop;
loop
-- 目前已建立的異步連接配接數
select cn into current_conns from
dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))
as t(cn int);
if current_conns=0 then
raise notice 'whole tasks done.';
for app_conn_name in
select application_name from
dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))
as t(application_name text)
loop
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
end loop;
return;
else
raise notice 'the last % tasks running.', current_conns;
perform pg_sleep(1);
end if;
end loop;
end;
$$ language plpgsql strict;
https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E8%AF%95%E7%94%A8 試用
1、運作5條SQL,開2個并行任務
select * from run_sqls_parallel(
2, -- 并行度
-- 執行如下SQL數組
array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']
)
as t(a text);
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: the last 2 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: whole tasks done.
run_sqls_parallel
-------------------
(1 row)
Time: 30070.275 ms (00:30.070)
2、運作10個并行任務,跑6條SQL
postgres=# select * from run_sqls_parallel(
10, -- 并行度
-- 執行如下SQL數組
array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']
)
as t(a text);
NOTICE: the last 6 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: whole tasks done.
run_sqls_parallel
-------------------
(1 row)
Time: 10050.064 ms (00:10.050)
完全符合預期。
3、結合前面寫的文檔,我們如果要建立很多索引,可以使用同樣的方法實作并行任務
create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);
do language plpgsql $$
declare
tables name[] := array['t1','t2','t3']; -- 表名
n name; -- 表名
x name; -- 字段名
i int; -- LOOP值
sql text;
sqls text[];
tbs name := 'tbs1'; -- 索引表空間
begin
set maintenance_work_mem='2GB';
foreach n in array tables loop
i := 1;
for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped
loop
-- 結合自動選擇索引接口(btree,hash,gin,gist等)的功能,可以實作更完美的全字段建立索引
sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs); -- 封裝建立索引的SQL
sqls := array_append(sqls, sql);
i:=i+1;
end loop;
end loop;
perform * from run_sqls_parallel(
10, -- 并行度
sqls -- 執行index SQL數組
) as t(a text);
foreach n in array tables loop
execute format('analyze %s', n);
end loop;
end;
$$;
https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%B0%8F%E7%BB%93 小結
本文使用dblink異步調用的功能,增加了一個API函數,可以用于開啟N個并行,跑若幹條長SQL,例如用來建立索引非常給力。
select * from run_sqls_parallel (
參數1:并行度,
參數2:要背景并行執行的SQLs(數組呈現)
)
as t(a text);