天天看點

PostgreSQL邏輯訂閱處理流程解析

1. 表同步階段處理流程概述

訂閱端執行CREATE SUBSCRIPTION後,在背景進行表資料同步。

每個表的資料同步狀态記錄在

pg_subscription_rel.srsubstate

中,一共有4種狀态碼。

  • 'i':初始化
  • 'd':正在copy資料
  • 's':已同步
  • 'r':準備好 (普通複制)

從執行CREATE SUBSCRIPTION開始訂閱端的相關處理流程概述如下:

  1. 設定每個表的為srsubstate中'i'(

    SUBREL_STATE_INIT

    )
  2. logical replication launcher啟動一個logial replication apply worker程序
  3. logial replication apply worker程序連接配接到訂閱端開始接受訂閱消息,此時表尚未完成初始同步(狀态為i或d),跳過所有insert,update和delete消息的處理。
  4. logial replication apply worker程序為每個未同步的表啟動logial replication sync worker程序(每個訂閱最多同時啟動

    max_sync_workers_per_subscription

    個sync worker)
  5. logial replication sync worker程序連接配接到訂閱端并同步初始資料
    • 5.1 建立臨時複制槽,并記錄快照位置。
    • 5.2 設定表同步狀态為'd'(

      SUBREL_STATE_DATASYNC

    • 5.3 copy表資料
    • 5.4 設定表同步狀态為

      SUBREL_STATE_SYNCWAIT

      (内部狀态),并等待apply worker更新狀态為

      SUBREL_STATE_CATCHUP

      (内部狀态)
  6. logial replication apply worker程序更新表同步狀态為

    SUBREL_STATE_CATCHUP

    (内部狀态),記錄最新lsn,并等待sync worker更新狀态為SUBREL_STATE_SYNCDONE
  7. logial replication sync worker程序完成初始資料同步
    • 7.1 檢查apply worker目前處理的訂閱消息位置是否已經走到了快照位置前面,如果是從訂閱端接受消息并處理直到追上apply worker。
    • 7.2 設定表同步狀态為's'(

      SUBREL_STATE_SYNCDONE

    • 7.3 程序退出
  8. logial replication apply worker程序繼續接受訂閱消息并處理
    • 8.1 接受到insert,update和delete消息,如果是同步點(進入's'或'r'狀态時的lsn位置)之後的消息進行應用。
    • 8.2 接受到commit消息
      • 8.2.1 更新複制源狀态,確定apply worker crash時可以找到正确的開始位置
      • 8.2.2 送出事務
      • 8.2.3 更新統計資訊
      • 8.2.4 将所有處于's'(

        SUBREL_STATE_SYNCDONE

        )同步狀态的表更新為'r'(

        SUBREL_STATE_READY

    • 8.3 暫時沒有新的消息處理
      • 8.3.1 向釋出端發送訂閱位置回報
      • 8.3.2 如果不在事務塊裡,同步表狀态。将所有處于's'(

        SUBREL_STATE_SYNCDONE

        SUBREL_STATE_READY

2. 表同步後的持續邏輯複制

訂閱表進入同步狀态(狀态碼是‘s’或'r')後,釋出端的變更都會通過消息通知訂閱端;

訂閱端apply worker按照訂閱消息的接受順序(即釋出端事務送出順序)對每個表apply變更,并回報apply位置,用于監視複制延遲。

通過調試,确認釋出端發生更新時,發送給訂閱端的資料包。

2.1 插入訂閱表

insert into tbx3 values(100);
           

釋出端修改訂閱表時,在事務送出時,釋出端依次發送下面的消息到訂閱端

  • B(BEGIN)
  • R(RELATION)
  • I(INSERT)
  • C(COMMIT)

    更新複制源

    pg_replication_origin_status

    中的

    remote_lsn

    local_lsn

    ,該位點對應于每個訂閱表最後一次事務送出的位置。
  • k(KEEPALIVE)
  • k(KEEPALIVE)

    2個keepalive消息,會更新統計表中的位置

    • 釋出端

      pg_stat_replication

      :

      write_lsn

      ,

      flush_lsn

      replay_lsn

    • pg_get_replication_slots()

      confirmed_flush_lsn

    • 訂閱端更新

      pg_stat_subscription

      latest_end_lsn

2.2 插入非訂閱表

insert into tbx10 values(100);
           

釋出端産生了和訂閱表無關修改,在事務送出時,釋出端依次發送下面的消息到訂閱端

  • 未産生實際事務,也不更新

    pg_replication_origin_status

  • k(KEEPALIVE)

    2個'k' keepalive消息,會更新統計表中的位置

3. 異常處理

3.1 sync worker

  1. SQL錯誤(如主鍵沖突):worker程序異常退出,之後apply worker建立一個新的sync worker重試。錯誤解除前每5秒重試一次。
  2. 表被鎖:等待
  3. 更新或删除的記錄不存在:正常執行,檢測不到錯誤,也麼沒有日志輸出(輸出一條DEBUG1級别的日志)。

3.2 apply worker

  1. SQL錯誤(如主鍵沖突):worker程序異常退出,之後logical replication launcher建立一個新的apply worker重試。錯誤解除前每5秒重試一次。

錯誤日志示例:

2018-07-28 20:11:56.018 UTC [470] ERROR:  duplicate key value violates unique constraint "tbx3_pkey"
2018-07-28 20:11:56.018 UTC [470] DETAIL:  Key (id)=(2) already exists.
2018-07-28 20:11:56.022 UTC [47] LOG:  worker process: logical replication worker for subscription 74283 (PID 470) exited with exit code 1
2018-07-28 20:12:01.029 UTC [471] LOG:  logical replication apply worker for subscription "sub_shard" has started
2018-07-28 20:12:01.049 UTC [471] ERROR:  duplicate key value violates unique constraint "tbx3_pkey"
2018-07-28 20:12:01.049 UTC [471] DETAIL:  Key (id)=(2) already exists.
2018-07-28 20:12:01.058 UTC [47] LOG:  worker process: logical replication worker for subscription 74283 (PID 471) exited with exit code 1
2018-07-28 20:12:06.070 UTC [472] LOG:  logical replication apply worker for subscription "sub_shard" has started
2018-07-28 20:12:06.089 UTC [472] ERROR:  duplicate key value violates unique constraint "tbx3_pkey"
2018-07-28 20:12:06.089 UTC [472] DETAIL:  Key (id)=(2) already exists.
           

4. 限制

  1. 不複制資料庫模式和DDL指令。
  2. 不複制序列資料。序列字段(serial / GENERATED ... AS IDENTITY)的值會被複制,但序列的值不會更新
  3. 不複制TRUNCATE指令。
  4. 不複制大對象
  5. 複制隻能從基表到基表。也就是說,釋出和訂閱端的表必須是普通表,而不是視圖, 物化視圖,分區根表或外部表。訂閱繼承表的父表,隻會複制父表的變更。
  6. 隻支援觸發器的一部分功能
  7. 不支援雙向複制,會導緻WAL循環。
  8. 不支援在同一個執行個體上的兩個資料庫上建立訂閱
  9. 不支援在備機上建立訂閱
  10. 訂閱表上沒有合适的REPLICA IDENTITY時,釋出端執行UPDATE/DELETE會報錯

注意事項

  1. CREATE SUBSCRIPTION指令執行時,要等待釋出端正在執行的事務結束。
  2. sync worker初始同步資料時,開啟了"REPEATABLE READ"事務,期間産生的垃圾不能被回收。
  3. 訂閱生效期間,釋出端所有事務産生的WAL必須在該事務結束時才能被回收。
  4. 訂閱端UPDATE/DELETE找不到資料時,沒有任何錯誤輸出。

5. 表同步階段相關代碼解析

釋出端Backend程序

CREATE PUBLICATION
  CreatePublication()
       CatalogTupleInsert(rel, tup);  // 在pg_publication系統表中插入此釋出資訊
       PublicationAddTables(puboid, rels, true, NULL);//
         publication_add_relation()
           check_publication_add_relation();// 檢查表類型,不支援的表報錯。隻支援普通表('r'),且不是unloged和臨時表
         CatalogTupleInsert(rel, tup);      // 在pg_publication_rel系統表中插入訂閱和表的映射
           

訂閱端Backend程序

CREATE SUBSCRIPTION
   CreateSubscription()
       CatalogTupleInsert(rel, tup);  //在pg_subscription系統表中插入此訂閱資訊
       replorigin_create(originname); //在pg_replication_origin系統表中插入此訂閱對應的複制源
       foreach(lc, tables)            // 設定每個表的pg_subscription_rel.srsubstate
         table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; // ★★★1 如果拷貝資料,設定每個表的pg_subscription_rel.srsubstate='i'
         SetSubscriptionRelState(subid, relid, table_state,InvalidXLogRecPtr, false); 
       walrcv_create_slot(wrconn, slotname, false,CRS_NOEXPORT_SNAPSHOT, &lsn);
       ApplyLauncherWakeupAtCommit(); //喚醒logical replication launcher程序
           

訂閱端logical replication launcher程序

ApplyLauncherMain()
  sublist = get_subscription_list(); //從pg_subscription擷取訂閱清單
  foreach(lc, sublist)
    logicalrep_worker_launch(..., InvalidOid); // 對enabled且沒有建立worker的訂閱建立apply worker。apply worker如果已超過max_logical_replication_workers(預設4)報錯
      RegisterDynamicBackgroundWorker(&bgw, &bgw_handle);// 注冊背景工作程序,入口函數為"ApplyWorkerMain"
           

訂閱端 logical apply worker程序

ApplyWorkerMain
  replorigin_session_setup(originid); // 從共享記憶體中查找并設定複制源,如果不存在使用新的,複制源名稱為pg_${訂閱OID}。
  origin_startpos = replorigin_session_get_progress(false);// 擷取複制源的remote_lsn
  walrcv_connect(MySubscription->conninfo, true, MySubscription->name,&err); // 連接配接到訂閱端
  walrcv_startstreaming(wrconn, &options); // 開始流複制
  LogicalRepApplyLoop(origin_startpos);   // Apply程序主循環
    for(;;)
      len = walrcv_receive(wrconn, &buf, &fd);
      if (c == 'w')  // 'w'消息的處理
          UpdateWorkerStats(last_received, send_time, false);更新worker統計資訊(last_lsn,last_send_time,last_recv_time)
          apply_dispatch(&s); // 分發邏輯複制指令
            switch (action)
              case 'B': /* BEGIN */
                 apply_handle_begin(s);
              case 'C': /* COMMIT */
                 apply_handle_commit(s);
                   if (IsTransactionState() && !am_tablesync_worker()) // 當釋出端的事務更新不涉及訂閱表時,仍會發送B和C消息,此時不在事務中,跳過下面操作
                     replorigin_session_origin_lsn = commit_data.end_lsn;  // 更新複制源狀态,確定apply worker crash時可以找到正确的開始位置
                     replorigin_session_origin_timestamp = commit_data.committime;
                     CommitTransactionCommand(); // 送出事務
                     pgstat_report_stat(false); // 更新統計資訊
                   process_syncing_tables(commit_data.end_lsn); // 對處于同步中的表,協調sync worker和apply worker程序同步狀态
                     process_syncing_tables_for_apply(current_lsn);
                       GetSubscriptionNotReadyRelations(MySubscription->oid); // 從pg_subscription_rel中擷取訂閱中所有非ready狀态的表。
                       foreach(lc, table_states) // 處理每個非ready狀态的表
                        if (rstate->state == SUBREL_STATE_SYNCDONE)
                        {
                            if (current_lsn >= rstate->lsn)
                            {
                                rstate->state = SUBREL_STATE_READY;                  //處理第一個事務後,從syncdone->ready狀态,但這個事務不需要和這個表相關。
                                rstate->lsn = current_lsn;
                                SetSubscriptionRelState(MyLogicalRepWorker->subid,    // 更新pg_subscription_rel
                                                        rstate->relid, rstate->state,
                                                        rstate->lsn, true);
                            }
                          }
                        else
                        {
                            syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                                                rstate->relid, false);
                            if (syncworker)
                            {
                                /* Found one, update our copy of its state */
                                rstate->state = syncworker->relstate;
                                rstate->lsn = syncworker->relstate_lsn;
                                if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                {
                                    /*
                                     * Sync worker is waiting for apply.  Tell sync worker it
                                     * can catchup now.
                                     */
                                    syncworker->relstate = SUBREL_STATE_CATCHUP;  // ★★★3 SUBREL_STATE_SYNCWAIT -> SUBREL_STATE_CATCHUP
                                    syncworker->relstate_lsn =
                                        Max(syncworker->relstate_lsn, current_lsn);
                                }
                
                                /* If we told worker to catch up, wait for it. */
                                if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                {
                                    /* Signal the sync worker, as it may be waiting for us. */
                                    if (syncworker->proc)
                                        logicalrep_worker_wakeup_ptr(syncworker);
                
                                    wait_for_relation_state_change(rstate->relid,
                                                                   SUBREL_STATE_SYNCDONE); // 等待sync worker将表的同步狀态設定為SUBREL_STATE_SYNCDONE
                                }
                            }
                            else
                            {
                                /*
                                 * If there is no sync worker for this table yet, count
                                 * running sync workers for this subscription, while we have
                                 * the lock.
                                 */
                                        logicalrep_worker_launch(MyLogicalRepWorker->dbid,   // 如果這個表沒有對應的sync worker,且sync worker數未超過max_sync_workers_per_subscription,啟動一個。
                                                                 MySubscription->oid,
                                                                 MySubscription->name,
                                                                 MyLogicalRepWorker->userid,
                                                                 rstate->relid);
                            }
            else if (c == 'k') // 'k'消息的處理
              send_feedback(last_received, reply_requested, false); // 向訂閱端發生回報
              UpdateWorkerStats(last_received, timestamp, true);    // 更新worker統計資訊(last_lsn,last_send_time,last_recv_time,reply_lsn,send_time) 
          case I': /* INSERT */
             apply_handle_insert(s);
               relid = logicalrep_read_insert(s, &newtup);
               if (!should_apply_changes_for_rel(rel))return; 
                    if (am_tablesync_worker())
                        return MyLogicalRepWorker->relid == rel->localreloid; // 對sync worker,隻apply其負責同步的表
                    else
                        return (rel->state == SUBREL_STATE_READY ||           // 對apply worker, 同步狀态為SUBREL_STATE_SYNCDONE時,隻同步syncdone位置之後的wal
                                (rel->state == SUBREL_STATE_SYNCDONE &&
                                 rel->statelsn <= remote_final_lsn));
               ExecSimpleRelationInsert(estate, remoteslot);  // 插入記錄
                 ExecBRInsertTriggers(estate, resultRelInfo, slot); // 處理BEFORE ROW INSERT Triggers
                 simple_heap_insert(rel, tuple);
                 ExecARInsertTriggers(estate, resultRelInfo, tuple,recheckIndexes, NULL); // 處理AFTER ROW INSERT Triggers
               AfterTriggerEndQuery(estate);  // 處理 queued AFTER triggers
         
          ...
    send_feedback(last_received, false, false);//沒有新的消息要處理,向釋出端發送位置回報
    process_syncing_tables(last_received);//如果不在事務塊裡,同步表狀态
           

訂閱端 logical sync worker程序

ApplyWorkerMain() //apply worker和sync worker使用相同的入口函數
  LogicalRepSyncTableStart(&origin_startpos);
    GetSubscriptionRelState()(MyLogicalRepWorker->subid,MyLogicalRepWorker->relid,&relstate_lsn, true);// 從pg_subscription_rel中擷取訂閱的複制lsn
    walrcv_connect(MySubscription->conninfo, true, slotname, &err);
    switch (MyLogicalRepWorker->relstate)
    {
        case SUBREL_STATE_INIT:
        case SUBREL_STATE_DATASYNC:
            {
                MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        MyLogicalRepWorker->relid,
                                        MyLogicalRepWorker->relstate,
                                        MyLogicalRepWorker->relstate_lsn,
                                        true);
                res = walrcv_exec(wrconn,                              // 開始事務
                                  "BEGIN READ ONLY ISOLATION LEVEL "
                                  "REPEATABLE READ", 0, NULL);
                walrcv_create_slot(wrconn, slotname, true,             // 使用快照建立臨時複制槽,并記錄快照位置。
                                   CRS_USE_SNAPSHOT, origin_startpos);
                copy_table(rel);                                       // copy表資料
                walrcv_exec(wrconn, "COMMIT", 0, NULL);
                MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;  // ★★★2 更新表同步狀态為SUBREL_STATE_SYNCWAIT
                MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                wait_for_worker_state_change(SUBREL_STATE_CATCHUP);    // 等待apply worker将狀态變更為SUBREL_STATE_CATCHUP
                if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) // 如果sync worker落後于apply worker,sync worker跳過此步繼續apply WAL;
                {
                    /*
                     * Update the new state in catalog.  No need to bother
                     * with the shmem state as we are exiting for good.
                     */
                    SetSubscriptionRelState(MyLogicalRepWorker->subid,    // ★★★4 把同步狀态從SUBREL_STATE_CATCHUP更新到SUBREL_STATE_SYNCDONE并退出
                                            MyLogicalRepWorker->relid,
                                            SUBREL_STATE_SYNCDONE,
                                            *origin_startpos,
                                            true);
                    finish_sync_worker();
                }
                break;
            }
        case SUBREL_STATE_SYNCDONE:
        case SUBREL_STATE_READY:
        case SUBREL_STATE_UNKNOWN:
            finish_sync_worker();
            break;    
    }
  options.startpoint = origin_startpos;
  walrcv_startstreaming(wrconn, &options);// 開始流複制,以同步快照位置作為流的開始位置
  LogicalRepApplyLoop(origin_startpos);   // Apply程序主循環
    for(;;)
      len = walrcv_receive(wrconn, &buf, &fd);
      UpdateWorkerStats(last_received, send_time, false); 更新worker統計資訊(last_lsn,last_send_time,last_recv_time)
      apply_dispatch(&s); // 分發邏輯複制指令
        switch (action)
          case 'B': /* BEGIN */
             apply_handle_begin(s);
          case 'C': /* COMMIT */
             apply_handle_commit(s);
               process_syncing_tables(commit_data.end_lsn); // 對處于同步中的表,協調sync worker和apply worker程序同步狀态
                 process_syncing_tables_for_sync(current_lsn);
                    if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
                        current_lsn >= MyLogicalRepWorker->relstate_lsn)
                    {
                        TimeLineID    tli;
                
                        MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; // ★★★4 把同步狀态從SUBREL_STATE_CATCHUP更新到SUBREL_STATE_SYNCDONE
                        MyLogicalRepWorker->relstate_lsn = current_lsn;
                
                        SpinLockRelease(&MyLogicalRepWorker->relmutex);
                
                        SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                                MyLogicalRepWorker->relid,
                                                MyLogicalRepWorker->relstate,
                                                MyLogicalRepWorker->relstate_lsn,
                                                true);
                
                        walrcv_endstreaming(wrconn, &tli);
                        finish_sync_worker();
                    }
          case I': /* INSERT */
             apply_handle_insert(s);
           

6.1 參考