天天看點

關于PG邏輯訂閱判斷資料是否同步的方法

PG邏輯訂閱過程中,怎麼判斷訂閱端已經同步到哪兒了?

考慮過2種方案,哪個更合适

  • 訂閱端的

    pg_stat_subscription

    latest_end_lsn

  • 釋出端的

    pg_stat_replication

    中的

    replay_lsn

1. 關于

pg_stat_subscription

latest_end_lsn

pg_stat_subscription

received_lsn

latest_end_lsn

比較像,它們的差別如下

  • received_lsn

    :最後一次接收到的預寫日志位置
  • latest_end_lsn

    :報告給原始WAL發送程式的最後的預寫日志位置

1.1

pg_stat_subscription

latest_end_lsn

的來源

來源是全局數組LogicalRepCtx->workers[]

select * from pg_stat_subscription
  pg_stat_get_subscription()
    memcpy(&worker, &LogicalRepCtx->workers[i],sizeof(LogicalRepWorker));
    values[6] = LSNGetDatum(worker.reply_lsn);           

1.2 LogicalRepWorker的配置設定

Launcher ApplyWorker時配置設定slot,通過bgw_main_arg參數傳給ApplyWorker

ApplyLauncherMain(Datum main_arg)
  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid);
    /* Find unused worker slot. */
    for (i = 0; i < max_logical_replication_workers; i++)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];

        if (!w->in_use)
        {
            worker = w;
            slot = i;
            break;
        }
    }
    bgw.bgw_main_arg = Int32GetDatum(slot);
    RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)           

1.3

latest_end_lsn

的更新

訂閱端隻有收到釋出端的keepalive消息,才會更新

pg_stat_subscription.latest_end_lsn

由于不是每次send_feedback()後都會更新

latest_end_lsn

,是以

latest_end_lsn

可能比實際回報給釋出端的lsn要滞後。實測時也經常能看到10秒以上的延遲。

為防止wal send逾時,當超過

wal_sender_timeout / 2

還沒有收到接受端回報時,發送端會主動發送keepalive消息。

LogicalRepApplyLoop(XLogRecPtr last_received)
  
    for (;;)
    {
    ...
        len = walrcv_receive(wrconn, &buf, &fd);
        if (len != 0)
        {
        
            if (c == 'w')
            {
                XLogRecPtr    start_lsn;
                XLogRecPtr    end_lsn;
                TimestampTz send_time;

                start_lsn = pq_getmsgint64(&s);
                end_lsn = pq_getmsgint64(&s);
                send_time = pq_getmsgint64(&s);

                if (last_received < start_lsn)
                    last_received = start_lsn;

                if (last_received < end_lsn)
                    last_received = end_lsn;

                UpdateWorkerStats(last_received, send_time, false);//更新pg_stat_subscription.received_lsn

                apply_dispatch(&s);
            }
            else if (c == 'k')
            {
                XLogRecPtr    end_lsn;
                TimestampTz timestamp;
                bool        reply_requested;

                end_lsn = pq_getmsgint64(&s);
                timestamp = pq_getmsgint64(&s);
                reply_requested = pq_getmsgbyte(&s);

                if (last_received < end_lsn)
                    last_received = end_lsn;

                send_feedback(last_received, reply_requested, false);//回報訂閱端的write/flush/reply lsn
                UpdateWorkerStats(last_received, timestamp, true);//更新pg_stat_subscription.received_lsn和pg_stat_subscription.latest_end_lsn
            }
        }
        send_feedback(last_received, false, false);//回報訂閱端的write/flush/reply lsn           

2. 如何跟蹤訂閱端實際apply到哪裡?

latest_end_lsn

也能在一定程度上反映訂閱端的apply位點,但是這和它本身的功能其實不是特别契合,而且它出現滞後的機率比較高,不是特别理想。

我們可以通過釋出端的

pg_stat_replication

統計視圖跟蹤訂閱端的apply位置。

同樣參考上面LogicalRepApplyLoop()的代碼,訂閱端回報自己複制位置的邏輯如下:

  • 如果沒有pending的事務(所有和訂閱相關的寫事務已經在訂閱端刷盤)

    回報給sender:write=flush=apply=接受到最新wal位置

  • 如果有pending的事務

    回報給sender:

    write=接受到最新wal位置
    flush=屬于訂閱範圍的寫事務已經在訂閱端刷盤的位置
    apply=屬于訂閱範圍的寫事務已經在訂閱端寫盤的位置
               

由上面可以看出,邏輯訂閱和實體複制不一樣,實體複制是先寫wal再apply這個WAL;邏輯訂閱是先apply事務,再回報這個事務産生的wal的flush位置

相關代碼如下:

send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
      get_flush_position(&writepos, &flushpos, &have_pending_txes);
    /*
     * No outstanding transactions to flush, we can report the latest received
     * position. This is important for synchronous replication.
     */
    if (!have_pending_txes)
        flushpos = writepos = recvpos;
    ...
    pq_sendbyte(reply_message, 'r');
    pq_sendint64(reply_message, recvpos);    /* write */
    pq_sendint64(reply_message, flushpos);    /* flush */
    pq_sendint64(reply_message, writepos);    /* apply */
    pq_sendint64(reply_message, now);    /* sendTime */
    pq_sendbyte(reply_message, requestReply);    /* replyRequested */


static void
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
                   bool *have_pending_txes)
{
    dlist_mutable_iter iter;
    XLogRecPtr    local_flush = GetFlushRecPtr();

    *write = InvalidXLogRecPtr;
    *flush = InvalidXLogRecPtr;

    dlist_foreach_modify(iter, &lsn_mapping)//lsn_mapping 在應用commit日志時更新
    {
        FlushPosition *pos =
        dlist_container(FlushPosition, node, iter.cur);

        *write = pos->remote_end;

        if (pos->local_end <= local_flush)
        {
            *flush = pos->remote_end;
            dlist_delete(iter.cur);//從lsn_mapping中移除已經本地刷盤的記錄
            pfree(pos);
        }
        else
        {
            /*
             * Don't want to uselessly iterate over the rest of the list which
             * could potentially be long. Instead get the last element and
             * grab the write position from there.
             */
            pos = dlist_tail_element(FlushPosition, node,
                                     &lsn_mapping);
            *write = pos->remote_end;
            *have_pending_txes = true;
            return;
        }
    }

    *have_pending_txes = !dlist_is_empty(&lsn_mapping);
}           

應用commit日志時,會将commit對應的遠端lsn和本地lsn添加到lsn_mapping末尾

ApplyWorkerMain
  LogicalRepApplyLoop(origin_startpos);
    apply_dispatch(&s);
      apply_handle_commit(StringInfo s)
        replorigin_session_origin_lsn = commit_data.end_lsn; //更新pg_replication_origin_status
        replorigin_session_origin_timestamp = commit_data.committime;
        CommitTransactionCommand();
        store_flush_position(commit_data.end_lsn);
            /* Track commit lsn  */
            flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
            flushpos->local_end = XactLastCommitEnd;
            flushpos->remote_end = remote_lsn;
            dlist_push_tail(&lsn_mapping, &flushpos->node);           

3. 釋出端

pg_stat_replication

中的apply位點能否保證正确性?

首先,需要明确,隻有出現以下情況時,拿到的apply位置才認為有誤的

  1. 釋出端更新了訂閱表的表
  2. 更新這個表的事務已送出
  3. 訂閱端還沒有應用這個事務
  4. pg_stat_replication

    中看到的apply位點已經大于等于3的事務結束位置

當所有表都是r或s狀态時,訂閱端的apply worker順序接受和應用WAL日志。

在訂閱端本地送出完成前,不會實施後續的send_feedback(),是以不會産生超過實際送出位置的apply位點(甚至碰巧

pg_stat_subscription

latest_end_lsn

也可以認為是對的)。

4. 釋出端

pg_stat_replication

中的apply位點是否可能回報不及時?

有可能。但是

pg_stat_replication.replay_lsn

滞後的機率低于

pg_stat_subscription.latest_end_lsn

當訂閱端已處于同步狀态時,下面的情況下

pg_stat_replication

中的apply位點可能回報不及時,比釋出端的目前lsn滞後。

  1. 訂閱端處于sleep狀态,最多sleep 1秒
  2. 釋出端發送非訂閱表更新的消息(含keepalive)不及時

發送端為了防止sender逾時,會及時發送keepalive保活,是以我們可以在釋出端停止更新訂閱表後,可以最多等待

wal_sender_timeout

一樣大的時間。

繼續閱讀