天天看點

阿裡雲RDS PostgreSQL OSS 外部表實踐 - (dblink異步調用封裝并行) 從OSS并行導入資料

标簽

PostgreSQL , oss對象存儲 , 阿裡雲RDS PG , 并行寫 , dblink , 異步調用 , 異步任務監控 , OSS外部表 , 資料傳輸

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E8%83%8C%E6%99%AF 背景

《阿裡雲RDS PostgreSQL OSS 外部表實踐 - (dblink異步調用封裝并行) 資料并行導出到OSS》

本文為從OSS并行導入資料到資料庫中。

請先閱讀:

RDS PG OSS 外部表文檔1 RDS PG OSS 外部表文檔2

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E5%8E%9F%E6%96%87 原文

https://www.atatech.org/articles/98990

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E4%B8%80%E5%87%86%E5%A4%87%E5%B7%A5%E4%BD%9C 一.準備工作

首先,建立我們要用到的插件。

create extension dblink;  
create extension oss_fdw;  
           

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E4%BA%8C%E5%88%9B%E5%BB%BA%E5%BC%82%E6%AD%A5%E5%AD%98%E5%82%A8%E8%BF%87%E7%A8%8B 二.建立異步存儲過程

異步資料裝載的準備工作,擷取oss檔案清單

CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_prepare(t_from text, t_to text)  
  RETURNS bool AS  
$BODY$  
DECLARE  
    t_exist  int;  
    curs1 refcursor;  
    r   record;  
    filepath text;  
    fileindex int8;  
    s1 text;  
    s2 text;  
    s3 text;  
    c int = 0;  
    s4 text;  
    s5 text;  
    ss4 text;  
    ss5 text;  
    sql text;  
BEGIN  
    create table if not exists oss_fdw_load_status(id BIGSERIAL primary key, filename text, size int8, rows int8 default 0, status int default 0);  
  
    select count(*) into t_exist from oss_fdw_load_status;  
  
    if t_exist != 0 then  
        RAISE NOTICE 'oss_fdw_load_status not empty';  
        return false;  
    end if;  
  
    -- 通過 oss_fdw_list_file 函數,把外部表 t_from 比對的 OSS 中的檔案列到表中  
    insert into oss_fdw_load_status (filename, size) select name,size from oss_fdw_list_file(t_from) order by size desc;  
  
    select count(*) into t_exist from oss_fdw_load_status;  
    if t_exist = 0 then  
        RAISE NOTICE 'oss_fdw_load_status empty,not task found';  
        return false;  
    end if;  
  
    return true;  
END;  
$BODY$  
    LANGUAGE plpgsql;  
           

資料裝載的工作函數

CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_execute(t_from text, t_to text, num_work int, pass text)  
  RETURNS bool AS  
$BODY$  
DECLARE  
    t_exist  int;  
    curs1 refcursor;  
    r   record;  
    filepath text;  
    fileindex int8;  
    s1 text;  
    s2 text;  
    s3 text;  
    c int = 0;  
    s4 text;  
    s5 text;  
    ss4 text;  
    ss5 text;  
    sql text;  
    db text;  
    user text;  
BEGIN  
    select count(*) into t_exist from oss_fdw_load_status;  
    if t_exist = 0 then  
        RAISE NOTICE 'oss_fdw_load_status empty';  
        return false;  
    end if;  
  
    s4 = 'oss_loader';  
    s5 = 'idle';  
    ss4 = '''' || s4 ||'''';  
    ss5 = '''' || s5 ||'''';  
    sql = 'select count(*) from pg_stat_activity where application_name = ' || ss4 || ' and state != ' || ss5;  
  
    select current_database() into db;  
    select current_user into user;  
  
    -- 通過遊标,不斷擷取單個任務  
    OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;  
    loop  
        fetch curs1 into r;  
        if not found then  
            exit;  
        end if;  
        fileindex = r.id;  
        filepath = r.filename;  
  
        s1 = '''' || t_from ||'''';  
        s2 = '''' || t_to ||'''';  
        s3 = '''' || filepath ||'''';  
  
        LOOP  
            -- 檢視目前正在工作的任務數,過達到并發數就在這裡等待  
            select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);  
            IF c < num_work THEN  
                EXIT;  
            END IF;  
            RAISE NOTICE 'current runing % loader', c;  
            perform pg_sleep(1);  
        END LOOP;  
  
        -- 通過 DBLINK 建立異步任務  
        perform dis_conn('oss_loader_'||fileindex);  
        perform dblink_connect('oss_loader_'||fileindex, 'dbname='||db ||' user='||user || ' application_name=oss_loader' || ' password='||pass);  
        perform dblink_send_query('oss_loader_'||fileindex, format('  
            begin;  
            select rds_oss_fdw_load_single_file(%s,%s,%s,%s);  
            end;'  
            , fileindex, s1, s2, s3)  
        );  
        RAISE NOTICE 'runing loader task % filename %',fileindex, filepath;  
    end loop;  
    close curs1;  
  
    -- 任務配置設定完成,等待所有任務完成  
    LOOP  
        select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);  
        IF c = 0 THEN  
            EXIT;  
        END IF;  
        RAISE NOTICE 'current runing % loader', c;  
        perform pg_sleep(1);  
    END LOOP;  
  
    return true;  
END;  
$BODY$  
    LANGUAGE plpgsql;  
           

單個檔案的資料裝載函數(設定不同的會話參數oss_fdw.rds_read_one_file,可以讀取不同的OSS檔案,用完重置)

CREATE OR REPLACE FUNCTION rds_oss_fdw_load_single_file(taskid int8, t_from text, t_to text, filepath text)  
  RETURNS void AS  
$BODY$  
DECLARE  
    rowscount int8 = 0;  
    current text;  
    sql text;  
BEGIN  
    -- 配置 GUC 參數,指定要導入的 OSS 上的檔案  
    perform set_config('oss_fdw.rds_read_one_file',filepath,true);  
    select current_setting('oss_fdw.rds_read_one_file') into current;  
    RAISE NOTICE 'begin load %', current;  
  
    -- 通過動态 SQL 導入資料  
    EXECUTE 'insert into '|| t_to || ' select * from ' || t_from;  
    GET DIAGNOSTICS rowscount = ROW_COUNT;  
  
    -- 導入完成後,把結果儲存到狀态表中  
    RAISE NOTICE 'end load id % % to % % rows', taskid, filepath, t_to, rowscount;  
    update oss_fdw_load_status set rows = rowscount,status = 1 where id = taskid;  
    return;  
  
EXCEPTION  
    when others then  
    RAISE 'run rds_oss_fdw_load_single_file with error';  
END;  
$BODY$  
    LANGUAGE plpgsql;  
           

關閉連接配接不報錯函數

create or replace function dis_conn(name) returns void as $$    
declare    
begin    
  perform dblink_disconnect($1);    
  return;    
exception when others then    
  return;    
end;    
$$ language plpgsql strict;    
           

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E4%B8%89%E4%BD%BF%E7%94%A8%E5%87%BD%E6%95%B0%E8%A3%85%E8%BD%BD%E6%95%B0%E6%8D%AE 三.使用函數裝載資料

1、 建立本地表(目标表)結構

2、 将包含資料的檔案寫入OSS

3、 在RDS PG中建立OSS外部表

4、 準備需要并行導入的清單

select rds_oss_fdw_load_data_prepare('oss_table','lineitem');  
           

執行後,會看到表 oss_fdw_load_status 中,儲存了準備導入的所有檔案清單,使用者可以做适當的删減定制。

4、 資料裝載

select rds_oss_fdw_load_data_execute('oss_table','lineitem',10,'mypassword');  
           

函數 rds_oss_fdw_load_data_execute 會等待資料導入的完成才傳回。

5、 查詢狀态

期間,我們可以通過下列 SQL 檢視正在工作的異步會話狀态

select application_name, state, pid,query, now() - xact_start as xact  from pg_stat_activity where state != 'idle' and application_name='oss_loader' order by xact desc;  
           

6、 管理狀态

同時,我們也可以随時中斷資料導入工作

select pg_terminate_backend(pid),application_name, state ,query from pg_stat_activity where state != 'idle' and pid != pg_backend_pid() and application_name='oss_loader';  
           

7、 檢視進度

我們也很容易看到整個資料裝載的進度(機關 MB)

select  
(  
select sum(size)/1024/1024 as complete from oss_fdw_load_status where status = 1  
)a,  
(  
select sum(size)/1024/1024 as full from oss_fdw_load_status  
)b;  
           

8、 性能

使用 TPCC 100GB的資料進行裝載測試,耗時 10 分鐘,平均 170MB/S

select rds_oss_fdw_load_data_prepare('t_oss2','lineitem');  
  
select rds_oss_fdw_load_data_execute('t_oss2','lineitem',10,'123456Zwj');  
  
select sum(size)/1024/1024 from oss_fdw_load_status;  
      ?column?        
--------------------  
 22561.919849395752  
(1 row)  
  
select pg_size_pretty(pg_relation_size(oid)) from pg_class where relname = 'lineitem';  
 pg_size_pretty   
----------------  
 101 GB  
(1 row)  
           

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E6%80%A7%E8%83%BD%E6%9E%81%E9%99%90%E6%89%A9%E5%B1%95 性能極限擴充

為了提高本地加載速度,使用者可以這麼做:

1、目标表選擇UNLOGGED TABLE,注意如果選擇unlogged table,那麼資料庫崩潰後unlogged table的資料會被清除,并且請記住備庫看不到unlogged table的資料。

除非你的資料是定期全量覆寫的,否則不建議用unlogged table來加速。

create unlogged table xxx (xx xx);  
           

2、選擇多個目标表

由于單個目标表,在INDEX LOCK,在EXTEND BLOCK方面都有一定的局限性,為了達到極限,可以使用多個目标表。例如每一批OSS檔案對應一個本地表分區。

-- 并行  
insert into tbl1 select * from oss_tbl1;  
insert into tbl2 select * from oss_tbl2;  
.....  
insert into tblx select * from oss_tblx;  
           

3、導入前關閉目标表的表級autovacuum

autovacuum會影響導入性能,因為它要消耗一定的IO。

alter table tbl_dict set (autovacuum_enabled =off);  
  
alter table tbl_dict set (toast.autovacuum_enabled =off);  
           

4、導入後再開啟目标表的autovacuum,收集統計資訊

alter table tbl_dict set (autovacuum_enabled =on);  
  
alter table tbl_dict set (toast.autovacuum_enabled =on);  
           

5、後建立索引(可以并行)

索引可以同時建立(單個表的多個索引可以同時建立,不會互相鎖等待。多個表的多個索引也可以同時建立),如果建立索引過程中不需要執行DML,建議不要開啟concurrently選項,否則建議開啟。

同時建立,可以提高資源使用率,達到快速完成資料導入加索引建立的目标。

https://github.com/digoal/blog/blob/master/201804/20180427_01.md#%E4%BA%91%E7%AB%AF%E7%9B%B8%E5%85%B3%E4%BA%A7%E5%93%81 雲端相關産品

阿裡雲 RDS PostgreSQL 阿裡雲 HybridDB for PostgreSQL HDB PG OSS 外部表文檔

繼續閱讀