标簽
PostgreSQL , 分區表 , 線上轉換
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E8%83%8C%E6%99%AF 背景
非分區表,如何線上(不影響業務)轉換為分區表?
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E6%96%B9%E6%B3%951pg_pathman%E5%88%86%E5%8C%BA%E6%8F%92%E4%BB%B6 方法1,pg_pathman分區插件
《PostgreSQL 9.5+ 高效分區表實作 - pg_pathman》使用非堵塞式的遷移接口
partition_table_concurrently(
relation REGCLASS, -- 主表OID
batch_size INTEGER DEFAULT 1000, -- 一個事務批量遷移多少記錄
sleep_time FLOAT8 DEFAULT 1.0) -- 獲得行鎖失敗時,休眠多久再次擷取,重試60次退出任務。
postgres=# select partition_table_concurrently('part_test'::regclass,
10000,
1.0);
NOTICE: worker started, you can stop it with the following command: select stop_concurrent_part_task('part_test');
partition_table_concurrently
------------------------------
(1 row)
遷移結束後,主表資料已經沒有了,全部在分區中
postgres=# select count(*) from only part_test;
count
-------
0
(1 row)
資料遷移完成後,建議禁用主表,這樣執行計劃就不會出現主表了
postgres=# select set_enable_parent('part_test'::regclass, false);
set_enable_parent
-------------------
(1 row)
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E6%96%B9%E6%B3%952%E5%8E%9F%E7%94%9F%E5%88%86%E5%8C%BA 方法2,原生分區
使用繼承表,觸發器,異步遷移,交換表名一系列步驟,線上将非分區表,轉換為分區表(交換表名是需要短暫的堵塞)。
關鍵技術:
1、繼承表(子分區)
對select, update, delete, truncate, drop透明。
2、觸發器
插入,采用before觸發器,資料路由到繼承分區
更新,采用before觸發器,删除老表記錄,同時将更新後的資料插入新表
3、背景遷移資料,cte only skip locked , delete only, insert into new table
4、遷移結束(p表沒有資料後),短暫上鎖,剝離INHERTI關系,切換到原生分區,切換表名。
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E4%BE%8B%E5%AD%90 例子
将一個表線上轉換為LIST分區表(僞HASH分區)。
範圍分區類似。
如果要轉換為原生HASH分區表,需要提取pg内置HASH分區算法。
1、建立測試表(需要被分區的表)
create table old (id int primary key, info text, crt_time timestamp);
2、寫入1000萬測試記錄
insert into old select generate_series(1,10000000) , md5(random()::text) , now();
3、建立子分區(本例使用LIST分區)
do language plpgsql $$
declare
parts int := 4;
begin
for i in 0..parts-1 loop
execute format('create table old_mid%s (like old including all) inherits (old)', i);
execute format('alter table old_mid%s add constraint ck check(abs(mod(id,%s))=%s)', i, parts, i);
end loop;
end;
$$;
4、插入,采用before觸發器,路由到新表
create or replace function ins_tbl() returns trigger as $$
declare
begin
case abs(mod(NEW.id,4))
when 0 then
insert into old_mid0 values (NEW.*);
when 1 then
insert into old_mid1 values (NEW.*);
when 2 then
insert into old_mid2 values (NEW.*);
when 3 then
insert into old_mid3 values (NEW.*);
else
return NEW; -- 如果是NULL則寫本地父表,主鍵不會為NULL
end case;
return null;
end;
$$ language plpgsql strict;
create trigger tg1 before insert on old for each row execute procedure ins_tbl();
5、更新,采用before觸發器,删除老表,同時将更新後的資料插入新表
create or replace function upd_tbl () returns trigger as $$
declare
begin
case abs(mod(NEW.id,4))
when 0 then
insert into old_mid0 values (NEW.*);
when 1 then
insert into old_mid1 values (NEW.*);
when 2 then
insert into old_mid2 values (NEW.*);
when 3 then
insert into old_mid3 values (NEW.*);
else
return NEW; -- 如果是NULL則寫本地父表,主鍵不會為NULL
end case;
delete from only old where id=NEW.id;
return null;
end;
$$ language plpgsql strict;
create trigger tg2 before update on old for each row execute procedure upd_tbl();
6、old table 如下
postgres=# \dt+ old
List of relations
Schema | Name | Type | Owner | Size | Description
--------+------+-------+----------+--------+-------------
public | old | table | postgres | 730 MB |
(1 row)
繼承關系如下
postgres=# \d+ old
Table "public.old"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
id | integer | | not null | | plain | |
info | text | | | | extended | |
crt_time | timestamp without time zone | | | | plain | |
Indexes:
"old_pkey" PRIMARY KEY, btree (id)
Triggers:
tg1 BEFORE INSERT ON old FOR EACH ROW EXECUTE PROCEDURE ins_tbl()
tg2 BEFORE UPDATE ON old FOR EACH ROW EXECUTE PROCEDURE upd_tbl()
Child tables: old_mid0,
old_mid1,
old_mid2,
old_mid3
7、驗證insert, update, delete, select完全符合要求。對業務SQL請求透明。
postgres=# insert into old values (0,'test',now());
INSERT 0 0
postgres=# select tableoid::regclass,* from old where id=1;
tableoid | id | info | crt_time
----------+----+----------------------------------+---------------------------
old | 1 | 22be06200f2a967104872f6f173fd038 | 31-JAN-19 12:52:25.887242
(1 row)
postgres=# select tableoid::regclass,* from old where id=0;
tableoid | id | info | crt_time
----------+----+------+---------------------------
old_mid0 | 0 | test | 31-JAN-19 13:02:35.859899
(1 row)
postgres=# update old set info='abc' where id in (0,2) returning tableoid::regclass,*;
tableoid | id | info | crt_time
----------+----+------+---------------------------
old_mid0 | 0 | abc | 31-JAN-19 13:02:35.859899
(1 row)
UPDATE 1
postgres=# select tableoid::regclass,* from old where id in (0,2);
tableoid | id | info | crt_time
----------+----+------+---------------------------
old_mid0 | 0 | abc | 31-JAN-19 13:12:03.343559
old_mid2 | 2 | abc | 31-JAN-19 13:11:04.763652
(2 rows)
postgres=# delete from old where id=3;
DELETE 1
postgres=# select tableoid::regclass,* from old where id=3;
tableoid | id | info | crt_time
----------+----+------+----------
(0 rows)
8、開啟壓測,背景對原表資料進行遷移
create or replace function test_ins(int) returns void as $$
declare
begin
insert into old values ($1,'test',now());
exception when others then
return;
end;
$$ language plpgsql strict;
vi test.sql
\set id1 random(10000001,200000000)
\set id2 random(1,5000000)
\set id3 random(5000001,10000000)
delete from old where id=:id2;
update old set info=md5(random()::text),crt_time=now() where id=:id3;
select test_ins(:id1);
開啟壓測
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 4 -j 4 -T 1200
...
progress: 323.0 s, 12333.1 tps, lat 0.324 ms stddev 0.036
progress: 324.0 s, 11612.9 tps, lat 0.344 ms stddev 0.203
progress: 325.0 s, 12546.0 tps, lat 0.319 ms stddev 0.061
progress: 326.0 s, 12728.7 tps, lat 0.314 ms stddev 0.038
progress: 327.0 s, 12536.9 tps, lat 0.319 ms stddev 0.040
progress: 328.0 s, 12534.1 tps, lat 0.319 ms stddev 0.042
progress: 329.0 s, 12228.1 tps, lat 0.327 ms stddev 0.047
...
9、線上遷移資料
批量遷移,每一批遷移N條。調用以下SQL
with a as (
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *
)
insert into old select * from a;
INSERT 0 0
postgres=# select count(*) from only old;
count
---------
9998998
(1 row)
postgres=# select count(*) from old;
count
----------
10000000
(1 row)
postgres=# with a as (
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *
)
insert into old select * from a;
INSERT 0 0
postgres=# select count(*) from old;
count
----------
10000000
(1 row)
postgres=# select count(*) from only old;
count
---------
9997998
(1 row)
postgres=# with a as (
delete from only old where ctid = any (array (select ctid from only old limit 100000 for update skip locked) ) returning *
)
insert into old select * from a;
INSERT 0 0
postgres=# select count(*) from only old;
count
---------
9897998
(1 row)
postgres=# select count(*) from old;
count
----------
10000000
(1 row)
一次遷移1萬條,分批操作。
with a as (
delete from only old where ctid = any (array (select ctid from only old limit 10000 for update skip locked) ) returning *
)
insert into old select * from a;
持續調用以上接口,直到當old表已經沒有資料,完全遷移到了分區。
select count(*) from only old;
count
-------
0
(1 row)
10、切換到分區表
建立分區表如下,分區方法與繼承限制一緻。
create table new (id int, info text, crt_time timestamp) partition by list (abs(mod(id,4)));
切換表名,防止雪崩,使用鎖逾時,由于隻涉及表名變更,是以速度非常快。
begin;
set lock_timeout ='3s';
alter table old_mid0 no inherit old;
alter table old_mid1 no inherit old;
alter table old_mid2 no inherit old;
alter table old_mid3 no inherit old;
alter table old rename to old_tmp;
alter table new rename to old;
alter table old ATTACH PARTITION old_mid0 for values in (0);
alter table old ATTACH PARTITION old_mid1 for values in (1);
alter table old ATTACH PARTITION old_mid2 for values in (2);
alter table old ATTACH PARTITION old_mid3 for values in (3);
end;
切換後的原生分區表如下
postgres=# \d+ old
Table "public.old"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
id | integer | | | | plain | |
info | text | | | | extended | |
crt_time | timestamp without time zone | | | | plain | |
Partition key: LIST (abs(mod(id, 4)))
Partitions: old_mid0 FOR VALUES IN (0),
old_mid1 FOR VALUES IN (1),
old_mid2 FOR VALUES IN (2),
old_mid3 FOR VALUES IN (3)
查詢測試
postgres=# explain select * from old where id=1;
QUERY PLAN
-------------------------------------------------------------------------------------
Append (cost=0.29..10.04 rows=4 width=44)
-> Index Scan using old_mid0_pkey on old_mid0 (cost=0.29..2.51 rows=1 width=44)
Index Cond: (id = 1)
-> Index Scan using old_mid1_pkey on old_mid1 (cost=0.29..2.51 rows=1 width=45)
Index Cond: (id = 1)
-> Index Scan using old_mid2_pkey on old_mid2 (cost=0.29..2.51 rows=1 width=44)
Index Cond: (id = 1)
-> Index Scan using old_mid3_pkey on old_mid3 (cost=0.29..2.51 rows=1 width=45)
Index Cond: (id = 1)
(9 rows)
postgres=# explain select * from old where id=? and abs(mod(id, 4)) = abs(mod(?, 4));
QUERY PLAN
-------------------------------------------------------------------------------------
Append (cost=0.29..2.52 rows=1 width=45)
-> Index Scan using old_mid1_pkey on old_mid1 (cost=0.29..2.51 rows=1 width=45)
Index Cond: (id = 1)
Filter: (mod(id, 4) = 1)
(4 rows)
資料
postgres=# select count(*) from old;
count
----------
10455894
(1 row)
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E6%96%B9%E6%B3%953logical-replication 方法3,logical replication
使用邏輯複制的方法,同步到分區表。
簡單步驟如下:
snapshot 快照(lsn位點)
全量
增量(邏輯複制,從LSN位置開始解析WAL LOG)
切換表名
略
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E5%85%B6%E4%BB%96 其他
hash函數
postgres=# \df *.*hash*
List of functions
Schema | Name | Result data type | Argument data types | Type
------------+--------------------------+------------------+---------------------------------------+------
pg_catalog | hash_aclitem | integer | aclitem | func
pg_catalog | hash_aclitem_extended | bigint | aclitem, bigint | func
pg_catalog | hash_array | integer | anyarray | func
pg_catalog | hash_array_extended | bigint | anyarray, bigint | func
pg_catalog | hash_numeric | integer | numeric | func
pg_catalog | hash_numeric_extended | bigint | numeric, bigint | func
pg_catalog | hash_range | integer | anyrange | func
pg_catalog | hash_range_extended | bigint | anyrange, bigint | func
pg_catalog | hashbpchar | integer | character | func
pg_catalog | hashbpcharextended | bigint | character, bigint | func
pg_catalog | hashchar | integer | "char" | func
pg_catalog | hashcharextended | bigint | "char", bigint | func
pg_catalog | hashenum | integer | anyenum | func
pg_catalog | hashenumextended | bigint | anyenum, bigint | func
pg_catalog | hashfloat4 | integer | real | func
pg_catalog | hashfloat4extended | bigint | real, bigint | func
pg_catalog | hashfloat8 | integer | double precision | func
pg_catalog | hashfloat8extended | bigint | double precision, bigint | func
pg_catalog | hashhandler | index_am_handler | internal | func
pg_catalog | hashinet | integer | inet | func
pg_catalog | hashinetextended | bigint | inet, bigint | func
pg_catalog | hashint2 | integer | smallint | func
pg_catalog | hashint2extended | bigint | smallint, bigint | func
pg_catalog | hashint4 | integer | integer | func
pg_catalog | hashint4extended | bigint | integer, bigint | func
pg_catalog | hashint8 | integer | bigint | func
pg_catalog | hashint8extended | bigint | bigint, bigint | func
pg_catalog | hashmacaddr | integer | macaddr | func
pg_catalog | hashmacaddr8 | integer | macaddr8 | func
pg_catalog | hashmacaddr8extended | bigint | macaddr8, bigint | func
pg_catalog | hashmacaddrextended | bigint | macaddr, bigint | func
pg_catalog | hashname | integer | name | func
pg_catalog | hashnameextended | bigint | name, bigint | func
pg_catalog | hashoid | integer | oid | func
pg_catalog | hashoidextended | bigint | oid, bigint | func
pg_catalog | hashoidvector | integer | oidvector | func
pg_catalog | hashoidvectorextended | bigint | oidvector, bigint | func
pg_catalog | hashtext | integer | text | func
pg_catalog | hashtextextended | bigint | text, bigint | func
pg_catalog | hashvarlena | integer | internal | func
pg_catalog | hashvarlenaextended | bigint | internal, bigint | func
pg_catalog | interval_hash | integer | interval | func
pg_catalog | interval_hash_extended | bigint | interval, bigint | func
pg_catalog | jsonb_hash | integer | jsonb | func
pg_catalog | jsonb_hash_extended | bigint | jsonb, bigint | func
pg_catalog | pg_lsn_hash | integer | pg_lsn | func
pg_catalog | pg_lsn_hash_extended | bigint | pg_lsn, bigint | func
pg_catalog | satisfies_hash_partition | boolean | oid, integer, integer, VARIADIC "any" | func
pg_catalog | time_hash | integer | time without time zone | func
pg_catalog | time_hash_extended | bigint | time without time zone, bigint | func
pg_catalog | timestamp_hash | integer | timestamp without time zone | func
pg_catalog | timestamp_hash_extended | bigint | timestamp without time zone, bigint | func
pg_catalog | timetz_hash | integer | time with time zone | func
pg_catalog | timetz_hash_extended | bigint | time with time zone, bigint | func
pg_catalog | uuid_hash | integer | uuid | func
pg_catalog | uuid_hash_extended | bigint | uuid, bigint | func
(56 rows)
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E5%B0%8F%E7%BB%93 小結
線上将表轉換為分區表,可以使用的方法:
1、轉換為pg_pathman分區,直接調用pg_pathman的UDF即可。
2、轉換為原生分區,使用繼承,異步遷移的方法。割接是短暫鎖表。
不支援 insert ino on conflict 文法。
insert into old values (1,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;
3、邏輯複制的方法,将資料增量遷移到分區表(目标可以是原生分區方法或者是pg_pathman分區方法的新表)。
https://github.com/digoal/blog/blob/master/201901/20190131_01.md#%E5%8F%82%E8%80%83 參考
《PostgreSQL 9.x, 10, 11 hash分區表 用法舉例》 《PostgreSQL 觸發器 用法詳解 1》 《PostgreSQL 觸發器 用法詳解 2》免費領取阿裡雲RDS PostgreSQL執行個體、ECS虛拟機
