天天看点

PostgreSQL 如何让心跳永远不死,支持半同步自动同步、异步升降级 - udf 心跳

标签

PostgreSQL , 同步 , 半同步 , 流复制 , 心跳 , 自动降级 , 自动升级 , dblink , 异步调用

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E8%83%8C%E6%99%AF 背景

在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。

UDF输入

1、优先模式(同步、异步)

2、同步等待超时时间

当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。

当优先为异步模式时,假设当前为同步配置,自动降级为异步。

当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。

使用技术点:

1、alter system

2、reload conf

3、cancle backend

4、dblink 异步调用

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%BF%83%E8%B7%B3udf%E9%80%BB%E8%BE%91 心跳UDF逻辑

判断当前实例状态  
  
  只读  
  
    退出  
  
  读写  
  
    判断当前事务模式   
  
      异步  
  
        发心跳  
  
        优先模式是什么  
  
          异步  
  
            退出  
  
          同步  
  
            判断是否需要升级  
  
              升级  
  
              退出  
  
  
      同步  
  
        消耗异步消息  
  
        发远程心跳  
  
        查询是否超时  
  
          降级  
  
        否则  
  
          消耗异步消息  
  
        优先模式是什么  
  
        异步  
  
          降级  
  
          退出  
  
        同步  
  
          退出  
           

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E8%AE%BE%E8%AE%A1 设计

1、当前postgresql.conf配置

synchronous_commit='remote_write';  
synchronous_standby_names='*';  
           

表示同步模式。

2、心跳表设计

create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);  
           

3、心跳写入方法

insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;  
           

4、创建一个建立连接函数,不报错

create or replace function conn(        
  name,   -- dblink名字        
  text    -- 连接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;      
           

5、更加以上逻辑创建心跳UDF。

create or replace function keepalive (  
  prio_commit_mode text,    
  tmout interval  
) returns t_keepalive as $$  
declare  
  res1 int;  
  res2 timestamp;  
  res3 pg_lsn;  
  commit_mode text;  
  conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());  
  conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());  
  app_prefix_stat text := 'keepalive_dblink';  
begin  
  if prio_commit_mode not in ('sync','async') then  
    raise notice 'prio_commit_mode must be [sync|async]';  
    return null;  
  end if;  
  
  show synchronous_commit into commit_mode;  
  
  create extension IF NOT EXISTS dblink;  
  
  -- 判断当前实例状态  
  if pg_is_in_recovery()   
  
  -- 只读  
  then  
    raise notice 'Current instance in recovery mode.';  
    return null;  
      
  -- 读写  
  else  
  
    -- 判断当前事务模式   
    if commit_mode in ('local','off')  
  
    -- 异步  
    then  
  
      -- 发心跳  
      insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;  
  
      -- 优先模式是什么  
      if prio_commit_mode='async'   
  
      -- 异步  
      then  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 判断是否需要升级  
        perform 1 from pg_stat_replication where state='streaming' limit 1;  
        if found  
  
        -- 升级  
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);   
          perform pg_reload_conf();   
  
          -- 退出  
          return row(res1,res2,res3)::t_keepalive;  
        end if;  
  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
  
  
    -- 同步  
    else  
  
      -- 消耗异步消息  
      perform conn(app_prefix_stat,  conn||app_prefix_stat);     
      perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
  
      -- 发远程心跳  
      perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);    
  
      -- 查询是否超时  
      <<ablock>>  
      loop  
        perform pg_sleep(0.2);  
  
        perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;  
        -- 未超时  
        if found then  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'no timeout';  
          exit ablock;  
        end if;  
            
        perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;  
        -- 降级  
        if found then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'timeout';  
          exit ablock;  
        end if;  
            
        perform pg_sleep(0.2);  
      end loop;  
  
      -- 优先模式是什么  
      if prio_commit_mode='async'   
  
      -- 异步  
      then  
        show synchronous_commit into commit_mode;  
        -- 降级  
        if commit_mode in ('on','remote_write','remote_apply')   
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
        end if;  
              
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
          
    end if;  
  
  end if;  
end;  
$$ language plpgsql strict;  
           

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E6%B5%8B%E8%AF%95 测试

1、当前为同步模式

postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  
           

2、人为关闭从库,心跳自动将数据库改成异步模式,并通知所有等待中会话。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 local  
(1 row)  
           

3、恢复从库,心跳自动将数据库升级为优先sync模式。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8  
(1 row)  
  
postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  no timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  
           

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%B0%8F%E7%BB%93 小结

使用心跳实现半同步,大大简化了整个同步、异步模式切换的流程。当然如果内核层面可以实现,配置几个参数,会更加完美。

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%8F%82%E8%80%83 参考

dblin 异步

《PostgreSQL 数据库心跳(SLA(RPO)指标的时间、WAL SIZE维度计算)》 《PostgreSQL 双节点流复制如何同时保证可用性、可靠性(rpo,rto) - (半同步,自动降级方法实践)》

https://github.com/digoal/blog/blob/master/201901/20190130_01.md#%E5%85%8D%E8%B4%B9%E9%A2%86%E5%8F%96%E9%98%BF%E9%87%8C%E4%BA%91rds-postgresql%E5%AE%9E%E4%BE%8Becs%E8%99%9A%E6%8B%9F%E6%9C%BA 免费领取阿里云RDS PostgreSQL实例、ECS虚拟机

PostgreSQL 如何让心跳永远不死,支持半同步自动同步、异步升降级 - udf 心跳