PG邏輯訂閱過程中,怎麼判斷訂閱端已經同步到哪兒了?
考慮過2種方案,哪個更合适
- 訂閱端的
中pg_stat_subscription
latest_end_lsn
- 釋出端的
中的pg_stat_replication
replay_lsn
1. 關于 pg_stat_subscription
latest_end_lsn
pg_stat_subscription
latest_end_lsn
pg_stat_subscription
received_lsn
和
latest_end_lsn
比較像,它們的差別如下
-
:最後一次接收到的預寫日志位置received_lsn
-
:報告給原始WAL發送程式的最後的預寫日志位置latest_end_lsn
1.1 pg_stat_subscription
latest_end_lsn
的來源
pg_stat_subscription
latest_end_lsn
來源是全局數組LogicalRepCtx->workers[]
select * from pg_stat_subscription
pg_stat_get_subscription()
memcpy(&worker, &LogicalRepCtx->workers[i],sizeof(LogicalRepWorker));
values[6] = LSNGetDatum(worker.reply_lsn);
1.2 LogicalRepWorker的配置設定
Launcher ApplyWorker時配置設定slot,通過bgw_main_arg參數傳給ApplyWorker
ApplyLauncherMain(Datum main_arg)
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid);
/* Find unused worker slot. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (!w->in_use)
{
worker = w;
slot = i;
break;
}
}
bgw.bgw_main_arg = Int32GetDatum(slot);
RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)
1.3 latest_end_lsn
的更新
latest_end_lsn
訂閱端隻有收到釋出端的keepalive消息,才會更新
pg_stat_subscription.latest_end_lsn
。
由于不是每次send_feedback()後都會更新
latest_end_lsn
,是以
latest_end_lsn
可能比實際回報給釋出端的lsn要滞後。實測時也經常能看到10秒以上的延遲。
為防止wal send逾時,當超過
wal_sender_timeout / 2
還沒有收到接受端回報時,發送端會主動發送keepalive消息。
LogicalRepApplyLoop(XLogRecPtr last_received)
for (;;)
{
...
len = walrcv_receive(wrconn, &buf, &fd);
if (len != 0)
{
if (c == 'w')
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
TimestampTz send_time;
start_lsn = pq_getmsgint64(&s);
end_lsn = pq_getmsgint64(&s);
send_time = pq_getmsgint64(&s);
if (last_received < start_lsn)
last_received = start_lsn;
if (last_received < end_lsn)
last_received = end_lsn;
UpdateWorkerStats(last_received, send_time, false);//更新pg_stat_subscription.received_lsn
apply_dispatch(&s);
}
else if (c == 'k')
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
bool reply_requested;
end_lsn = pq_getmsgint64(&s);
timestamp = pq_getmsgint64(&s);
reply_requested = pq_getmsgbyte(&s);
if (last_received < end_lsn)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);//回報訂閱端的write/flush/reply lsn
UpdateWorkerStats(last_received, timestamp, true);//更新pg_stat_subscription.received_lsn和pg_stat_subscription.latest_end_lsn
}
}
send_feedback(last_received, false, false);//回報訂閱端的write/flush/reply lsn
2. 如何跟蹤訂閱端實際apply到哪裡?
latest_end_lsn
也能在一定程度上反映訂閱端的apply位點,但是這和它本身的功能其實不是特别契合,而且它出現滞後的機率比較高,不是特别理想。
我們可以通過釋出端的
pg_stat_replication
統計視圖跟蹤訂閱端的apply位置。
同樣參考上面LogicalRepApplyLoop()的代碼,訂閱端回報自己複制位置的邏輯如下:
-
如果沒有pending的事務(所有和訂閱相關的寫事務已經在訂閱端刷盤)
回報給sender:write=flush=apply=接受到最新wal位置
-
如果有pending的事務
回報給sender:
write=接受到最新wal位置 flush=屬于訂閱範圍的寫事務已經在訂閱端刷盤的位置 apply=屬于訂閱範圍的寫事務已經在訂閱端寫盤的位置
由上面可以看出,邏輯訂閱和實體複制不一樣,實體複制是先寫wal再apply這個WAL;邏輯訂閱是先apply事務,再回報這個事務産生的wal的flush位置
相關代碼如下:
send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
get_flush_position(&writepos, &flushpos, &have_pending_txes);
/*
* No outstanding transactions to flush, we can report the latest received
* position. This is important for synchronous replication.
*/
if (!have_pending_txes)
flushpos = writepos = recvpos;
...
pq_sendbyte(reply_message, 'r');
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
static void
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
bool *have_pending_txes)
{
dlist_mutable_iter iter;
XLogRecPtr local_flush = GetFlushRecPtr();
*write = InvalidXLogRecPtr;
*flush = InvalidXLogRecPtr;
dlist_foreach_modify(iter, &lsn_mapping)//lsn_mapping 在應用commit日志時更新
{
FlushPosition *pos =
dlist_container(FlushPosition, node, iter.cur);
*write = pos->remote_end;
if (pos->local_end <= local_flush)
{
*flush = pos->remote_end;
dlist_delete(iter.cur);//從lsn_mapping中移除已經本地刷盤的記錄
pfree(pos);
}
else
{
/*
* Don't want to uselessly iterate over the rest of the list which
* could potentially be long. Instead get the last element and
* grab the write position from there.
*/
pos = dlist_tail_element(FlushPosition, node,
&lsn_mapping);
*write = pos->remote_end;
*have_pending_txes = true;
return;
}
}
*have_pending_txes = !dlist_is_empty(&lsn_mapping);
}
應用commit日志時,會将commit對應的遠端lsn和本地lsn添加到lsn_mapping末尾
ApplyWorkerMain
LogicalRepApplyLoop(origin_startpos);
apply_dispatch(&s);
apply_handle_commit(StringInfo s)
replorigin_session_origin_lsn = commit_data.end_lsn; //更新pg_replication_origin_status
replorigin_session_origin_timestamp = commit_data.committime;
CommitTransactionCommand();
store_flush_position(commit_data.end_lsn);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
flushpos->local_end = XactLastCommitEnd;
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
3. 釋出端 pg_stat_replication
中的apply位點能否保證正确性?
pg_stat_replication
首先,需要明确,隻有出現以下情況時,拿到的apply位置才認為有誤的
- 釋出端更新了訂閱表的表
- 更新這個表的事務已送出
- 訂閱端還沒有應用這個事務
-
中看到的apply位點已經大于等于3的事務結束位置pg_stat_replication
當所有表都是r或s狀态時,訂閱端的apply worker順序接受和應用WAL日志。
在訂閱端本地送出完成前,不會實施後續的send_feedback(),是以不會産生超過實際送出位置的apply位點(甚至碰巧
pg_stat_subscription
latest_end_lsn
也可以認為是對的)。
4. 釋出端 pg_stat_replication
中的apply位點是否可能回報不及時?
pg_stat_replication
有可能。但是
pg_stat_replication.replay_lsn
滞後的機率低于
pg_stat_subscription.latest_end_lsn
當訂閱端已處于同步狀态時,下面的情況下
pg_stat_replication
中的apply位點可能回報不及時,比釋出端的目前lsn滞後。
- 訂閱端處于sleep狀态,最多sleep 1秒
- 釋出端發送非訂閱表更新的消息(含keepalive)不及時
發送端為了防止sender逾時,會及時發送keepalive保活,是以我們可以在釋出端停止更新訂閱表後,可以最多等待
wal_sender_timeout
一樣大的時間。