标签
PostgreSQL , 同步 , 半同步 , 流复制 , 心跳 , 自动降级 , 自动升级 , dblink , 异步调用
https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E8%83%8C%E6%99%AF 背景
在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。
UDF输入
1、优先模式(同步、异步)
2、同步等待超时时间
当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。
当优先为异步模式时,假设当前为同步配置,自动降级为异步。
当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。
使用技术点:
1、alter system
2、reload conf
3、cancle backend
4、dblink 异步调用
https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%BF%83%E8%B7%B3udf%E9%80%BB%E8%BE%91 心跳UDF逻辑
判断当前实例状态
只读
退出
读写
判断当前事务模式
异步
发心跳
优先模式是什么
异步
退出
同步
判断是否需要升级
升级
退出
同步
消耗异步消息
发远程心跳
查询是否超时
降级
否则
消耗异步消息
优先模式是什么
异步
降级
退出
同步
退出
https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E8%AE%BE%E8%AE%A1 设计
1、当前postgresql.conf配置
synchronous_commit='remote_write';
synchronous_standby_names='*';
表示同步模式。
2、心跳表设计
create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);
3、心跳写入方法
insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;
4、创建一个建立连接函数,不报错
create or replace function conn(
name, -- dblink名字
text -- 连接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
5、更加以上逻辑创建心跳UDF。
create or replace function keepalive (
prio_commit_mode text,
tmout interval
) returns t_keepalive as $$
declare
res1 int;
res2 timestamp;
res3 pg_lsn;
commit_mode text;
conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());
conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());
app_prefix_stat text := 'keepalive_dblink';
begin
if prio_commit_mode not in ('sync','async') then
raise notice 'prio_commit_mode must be [sync|async]';
return null;
end if;
show synchronous_commit into commit_mode;
create extension IF NOT EXISTS dblink;
-- 判断当前实例状态
if pg_is_in_recovery()
-- 只读
then
raise notice 'Current instance in recovery mode.';
return null;
-- 读写
else
-- 判断当前事务模式
if commit_mode in ('local','off')
-- 异步
then
-- 发心跳
insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;
-- 优先模式是什么
if prio_commit_mode='async'
-- 异步
then
-- 退出
return row(res1,res2,res3)::t_keepalive;
-- 同步
else
-- 判断是否需要升级
perform 1 from pg_stat_replication where state='streaming' limit 1;
if found
-- 升级
then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);
perform pg_reload_conf();
-- 退出
return row(res1,res2,res3)::t_keepalive;
end if;
return row(res1,res2,res3)::t_keepalive;
end if;
-- 同步
else
-- 消耗异步消息
perform conn(app_prefix_stat, conn||app_prefix_stat);
perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
-- 发远程心跳
perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);
-- 查询是否超时
<<ablock>>
loop
perform pg_sleep(0.2);
perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;
-- 未超时
if found then
select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
raise notice 'no timeout';
exit ablock;
end if;
perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;
-- 降级
if found then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);
perform pg_reload_conf();
perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';
select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
raise notice 'timeout';
exit ablock;
end if;
perform pg_sleep(0.2);
end loop;
-- 优先模式是什么
if prio_commit_mode='async'
-- 异步
then
show synchronous_commit into commit_mode;
-- 降级
if commit_mode in ('on','remote_write','remote_apply')
then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);
perform pg_reload_conf();
perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';
end if;
-- 退出
return row(res1,res2,res3)::t_keepalive;
-- 同步
else
-- 退出
return row(res1,res2,res3)::t_keepalive;
end if;
end if;
end if;
end;
$$ language plpgsql strict;
https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E6%B5%8B%E8%AF%95 测试
1、当前为同步模式
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
remote_write
(1 row)
2、人为关闭从库,心跳自动将数据库改成异步模式,并通知所有等待中会话。
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
NOTICE: timeout
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8
(1 row)
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
local
(1 row)
3、恢复从库,心跳自动将数据库升级为优先sync模式。
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8
(1 row)
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
NOTICE: no timeout
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8
(1 row)
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
remote_write
(1 row)
https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%B0%8F%E7%BB%93 小结
使用心跳实现半同步,大大简化了整个同步、异步模式切换的流程。当然如果内核层面可以实现,配置几个参数,会更加完美。
https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%8F%82%E8%80%83 参考
dblin 异步
《PostgreSQL 数据库心跳(SLA(RPO)指标的时间、WAL SIZE维度计算)》 《PostgreSQL 双节点流复制如何同时保证可用性、可靠性(rpo,rto) - (半同步,自动降级方法实践)》https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%85%8D%E8%B4%B9%E9%A2%86%E5%8F%96%E9%98%BF%E9%87%8C%E4%BA%91rds-postgresql%E5%AE%9E%E4%BE%8Becs%E8%99%9A%E6%8B%9F%E6%9C%BA 免费领取阿里云RDS PostgreSQL实例、ECS虚拟机
