天天看點

PostgreSQL dblink異步調用實踐,跑并行多任務 - 例如開N個并行背景任務建立索引, 開N個背景任務跑若幹SQL

标簽

PostgreSQL , 背景任務 , DBLINK 異步調用

https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E8%83%8C%E6%99%AF 背景

使用DBLINK異步接口,可以非常友善的實作跑背景任務,如果要讓資料庫執行若幹條SQL,開N個并行執行,同樣可以使用DBLINK封裝成API進行調用。

例如,結合我前面的一些文字,可以實作自動選擇索引接口、指定并行度、指定表空間、給所有字段建立索引。

《自動選擇正确索引通路接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》 《PostgreSQL 快速給指定表每個字段建立索引》 《阿裡雲RDS PostgreSQL OSS 外部表實踐 - (dblink異步調用封裝并行) 從OSS并行導入資料》 《在PostgreSQL中跑背景長任務的方法 - 使用dblink異步接口》

https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%B9%B6%E8%A1%8C%E5%90%8E%E5%8F%B0%E4%BB%BB%E5%8A%A1%E6%8E%A5%E5%8F%A3%E5%AE%9E%E7%8E%B0 并行背景任務接口實作

接口效果:

select run_sqls_parallel (  
  參數1:并行度,  
  參數2:要執行的SQLs(數組呈現)  
);  
           

https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%AE%9E%E7%8E%B0 實作

1、建立dblink插件

create extension if not exists dblink;    
           

2、建立一個建立連接配接函數,不報錯

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 連接配接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;    
           

3、建立跑多任務的接口函數

create or replace function run_sqls_parallel(   
  parallels int,    -- 并行度  
  sqls text[],      -- 需要執行的SQLs  
  conn text    default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database())       -- 連接配接串     
)  
returns setof record as $$  
declare  
  app_prefix_stat text := md5(random()::text);   -- 用來擷取pg_stat_activity的實時内容 (由于pg_stat_activity的函數是stable的,無法在事務中擷取到被其他會話變更的内容)  
  app_prefix text := md5(random()::text);        -- application, dblink name prefix   
  i int := 1;       -- 任務ID變量,1累加  
  app_conn_name text;  -- application_name, dblink conn name = app_prefix+i   
  sql text;       -- SQL 元素   
  current_conns int := 0;  -- 目前活躍的異步調用   
begin  
  -- 建立擷取實時pg_stat_activity内容連接配接    
  perform conn(app_prefix_stat,  conn||app_prefix_stat);              
  
  foreach sql in array sqls  
  loop  
    -- 目前是否有空閑異步連接配接   
    select application_name into app_conn_name from   
      dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
    as t(application_name text);   
    -- 有空閑異步連接配接  
    if found then  
      -- 消耗掉上一次異步連接配接的結果,否則會報錯。  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);      
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
      -- 發送異步DBLINK調用      
      perform dblink_send_query(app_conn_name, sql);   
      
    -- 無空閑異步連接配接  
    else  
      -- 目前已建立的異步連接配接數  
      select cn into current_conns from   
        dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
      as t(cn int);      
      loop  
        -- 達到并行度  
        if current_conns >= parallels then   
          -- 是否有空閑異步連接配接  
          select application_name into app_conn_name from   
            dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
          as t(application_name text);   
          -- 有  
          if found then  
            -- 消耗掉上一次異步連接配接的結果,否則會報錯。  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
            -- 發送異步DBLINK調用      
            perform dblink_send_query(app_conn_name, sql);   
              
            -- 退出循環  
            exit;  
          -- 沒有,等  
          else  
            perform pg_sleep(1);  
            raise notice 'current running tasks: %, waiting idle conns.', current_conns;  
          end if;  
        -- 未達到并行度  
        else  
          -- 建立連接配接  
          perform conn(app_prefix||i,  conn||app_prefix||i);             -- 建立連接配接。  
  
          -- 發送異步DBLINK調用      
          perform dblink_send_query(app_prefix||i, sql);   
            
          -- 連接配接suffix序号 遞增  
          i := i+1;  
            
          -- 退出循環  
          exit;  
        end if;  
      end loop;  
    end if;  
  end loop;  
  
  loop  
    -- 目前已建立的異步連接配接數  
    select cn into current_conns from   
      dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))  
    as t(cn int);      
    if current_conns=0 then   
      raise notice 'whole tasks done.';  
      for app_conn_name in 
        select application_name from   
          dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
        as t(application_name text)
      loop
        return query select a from dblink_get_result(app_conn_name, false) as t(a text);   
      end loop;
      return;  
    else  
      raise notice 'the last % tasks running.', current_conns;  
      perform pg_sleep(1);  
    end if;  
  end loop;  
  
end;  
$$ language plpgsql strict;  
           

https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E8%AF%95%E7%94%A8 試用

1、運作5條SQL,開2個并行任務

select * from run_sqls_parallel(  
 2, -- 并行度  
 -- 執行如下SQL數組  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   
  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  the last 2 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 30070.275 ms (00:30.070)  
           

2、運作10個并行任務,跑6條SQL

postgres=# select * from run_sqls_parallel(  
 10, -- 并行度  
 -- 執行如下SQL數組  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   

NOTICE:  the last 6 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 10050.064 ms (00:10.050)  
           

完全符合預期。

3、結合前面寫的文檔,我們如果要建立很多索引,可以使用同樣的方法實作并行任務

create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
           
do language plpgsql $$    
declare    
  tables name[] := array['t1','t2','t3'];     -- 表名  
  n name;   -- 表名  
  x name;   -- 字段名  
  i int;    -- LOOP值  
  sql text;        
  sqls text[];      
  tbs name := 'tbs1';    -- 索引表空間  
begin    
  set maintenance_work_mem='2GB';    
    
  foreach n in array tables loop    
    i := 1;      
    for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped   
    loop    
      -- 結合自動選擇索引接口(btree,hash,gin,gist等)的功能,可以實作更完美的全字段建立索引  
      sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs);      -- 封裝建立索引的SQL    
      sqls := array_append(sqls, sql);   
      i:=i+1;    
    end loop;    
  end loop;    
  
  perform * from run_sqls_parallel(  
    10,   -- 并行度  
    sqls  -- 執行index SQL數組  
  ) as t(a text);   
  
  foreach n in array tables loop    
    execute format('analyze %s', n);     
  end loop;    
end;    
$$;    
           

https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%B0%8F%E7%BB%93 小結

本文使用dblink異步調用的功能,增加了一個API函數,可以用于開啟N個并行,跑若幹條長SQL,例如用來建立索引非常給力。

select * from run_sqls_parallel (  
  參數1:并行度,   
  參數2:要背景并行執行的SQLs(數組呈現)     
)
as t(a text);
           

https://github.com/digoal/blog/blob/master/201809/20180903_01.md#%E5%8F%82%E8%80%83 參考

《PostgreSQL AB表切換最佳實踐 - 提高切換成功率,杜絕雪崩 - 珍藏級》