天天看點

Postgres-xl GTM(全局事務管理器 Globale Transaction Manager)GTM Master & Standby 啟動流程

GTM Master & Standby 啟動流程

Postgres-xc 啟動流程

Postgres-xl GTM(全局事務管理器 Globale Transaction Manager)GTM Master & Standby 啟動流程
Postgres-xl GTM(全局事務管理器 Globale Transaction Manager)GTM Master & Standby 啟動流程

Postgres-xl 啟動流程

GTM Master啟動流程:

main()

–> InitializeGTMOptions() --> InitGTMProcess() --> Parse command line options and load configuration file --> BaseInit()

–> GTM_RestoreStart(ctlf, &restoreContext) --> GTM_RestoreTxnInfo(ctlf, next_gxid, &restoreContext, force_xid) --> GTM_RestoreSeqInfo(ctlf, &restoreContext)

–> GTM_SetNeedBackup() --> GTM_WriteRestorePoint()

–> Establish input sockets --> Setup signal handlers

–> ServerLoop()

GTM Standby啟動流程:

main()

–> InitializeGTMOptions() --> InitGTMProcess() --> Parse command line options and load configuration file --> BaseInit()

–Recovery_IsStandby()–> Recovery_StandbySetConnInfo(active_addr, active_port) –Recovery_IsStandby()–> gtm_standby_start_startup() –Recovery_IsStandby()–> gtm_standby_begin_backup() --> gtm_standby_restore_next_gxid() --> gtm_standby_restore_gxid() --> gtm_standby_restore_sequence()

–> GTM_SetNeedBackup() --> GTM_WriteRestorePoint()

–Recovery_IsStandby()–> gtm_standby_register_self(NodeName, GTMPortNumber, GTMDataDir) –Recovery_IsStandby()–> gtm_standby_restore_node()

–> Establish input sockets --> Setup signal handlers

–Recovery_IsStandby()–> gtm_standby_activate_self() --> gtm_standby_end_backup() --> gtm_standby_finish_startup()

–> ServerLoop()

共有函數分析1

InitGTMProcess函數調用MainThreadInit函數建立GTM主線程,并向全局所有子線程資訊結構清單添加,初始化互斥鎖變量control_lock。MainThreadInit函數初始化全局所有子線程資訊結構清單鎖和全局backup鎖gtm_bkup_lock以及用戶端client identifier。

static void InitGTMProcess() {
  GTM_ThreadInfo *thrinfo = MainThreadInit();
  MyThreadID = pthread_self();
  MemoryContextInit();
  /* The memory context is now set up. Add the thrinfo structure in the global array */
  if (GTM_ThreadAdd(thrinfo) == -1){
    fprintf(stderr, "GTM_ThreadAdd for main thread failed: %d", errno);fflush(stdout);fflush(stderr);
  }
  GTM_MutexLockInit(&control_lock);
}

static GTM_ThreadInfo *MainThreadInit(){
  GTM_ThreadInfo *thrinfo;
  pthread_key_create(&threadinfo_key, NULL);
  /* Initialize the lock protecting the global threads info and backup lock info. */
  GTM_RWLockInit(&GTMThreads->gt_lock);
  GTM_RWLockInit(&gtm_bkup_lock);
  /* Set the next client identifier to be issued after connection establishment */
  GTMThreads->gt_starting_client_id = 0;
  GTMThreads->gt_next_client_id = 1;
  /* We are called even before memory context management is setup. We must use malloc */
  thrinfo = (GTM_ThreadInfo *)malloc(sizeof (GTM_ThreadInfo));
  if (thrinfo == NULL){
    fprintf(stderr, "malloc failed: %d", errno);fflush(stdout);fflush(stderr);exit(1);
  }
  memset(thrinfo, 0, sizeof(GTM_ThreadInfo));
  thrinfo->is_main_thread = true;
  GTM_RWLockInit(&thrinfo->thr_lock);
  if (SetMyThreadInfo(thrinfo)){
    fprintf(stderr, "SetMyThreadInfo failed: %d", errno);fflush(stdout);fflush(stderr);exit(1);
  }
  /* Must be done after thread-info is set */
  GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE);
  TopMostThreadID = pthread_self();
  return thrinfo;
}      

BaseInit函數首先初始化全局standby鎖,處理資料檔案路徑,初始化TxnManager、SeqManager、NodeManager。

static void BaseInit() {
  /* Initialize standby lock before doing anything else */
  Recovery_InitStandbyLock();
  checkDataDir();
  SetDataDir();
  ChangeToDataDir();
  CreateDataDirLockFile();
  sprintf(GTMControlFile, "%s/%s", GTMDataDir, GTM_CONTROL_FILE);
  sprintf(GTMControlFileTmp, "%s/%s", GTMDataDir, GTM_CONTROL_FILE_TMP);
  if (GTMLogFile == NULL){
    GTMLogFile = (char *) malloc(GTM_MAX_PATH);
    sprintf(GTMLogFile, "%s/%s", GTMDataDir, GTM_LOG_FILE);
  }
  /* Save Node Register File in register.c */
  Recovery_SaveRegisterFileName(GTMDataDir);
  DebugFileOpen();
  GTM_InitTxnManager();
  GTM_InitSeqManager();
  GTM_InitNodeManager();
}      

GTM Standby特有函數1

Recovery_StandbySetConnInfo擷取全局standby鎖,初始化GTM_ActiveAddress和GTM_ActivePort變量,釋放全局standby鎖。gtm_standby_start_startup函數利用gtm_standby_connectToActiveGTM函數連接配接GTM Master。

void Recovery_StandbySetConnInfo(const char *addr, int port){
  GTM_RWLockAcquire(&StandbyLock, GTM_LOCKMODE_WRITE);
  GTM_ActiveAddress = strdup(addr);
  GTM_ActivePort = port;
  GTM_RWLockRelease(&StandbyLock);
}

int gtm_standby_start_startup(void) {
  GTM_ActiveConn = gtm_standby_connectToActiveGTM();
  if (GTM_ActiveConn == NULL){
    elog(DEBUG3, "Error in connection");
    return 0;
  }
  elog(LOG, "Connection established to the GTM active.");
  return 1;
}      

gtm_standby_begin_backup函數調用set_begin_end_backup(GTM_ActiveConn, true),該函數向GTM Master發送開始備份指令(‘C’+MSG_BEGIN_BACKUP)。

int gtm_standby_begin_backup(void) {
  int rc = set_begin_end_backup(GTM_ActiveConn, true);
  return (rc ? 0 : 1);
}

/* Backup to Standby */
int set_begin_end_backup(GTM_Conn *conn, bool begin) {
  GTM_Result *res = NULL;
  time_t finish_time;
  if (gtmpqPutMsgStart('C', true, conn))
    goto send_failed;
  if(gtmpqPutInt(begin ? MSG_BEGIN_BACKUP : MSG_END_BACKUP, sizeof(GTM_MessageType), conn))
    goto send_failed;
  if (gtmpqPutMsgEnd(conn))
    goto send_failed;
  if (gtmpqFlush(conn))
    goto send_failed;
  finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
  if (gtmpqWaitTimed(true, false, conn, finish_time) || gtmpqReadData(conn) < 0)
    goto receive_failed;
  if ((res = GTMPQgetResult(conn)) == NULL)
    goto receive_failed;
  return res->gr_status;
receive_failed:
send_failed:
  conn->result = makeEmptyResultIfIsNull(conn->result);
  conn->result->gr_status = GTM_RESULT_COMM_ERROR;
  return -1;
}      

gtm_standby_restore_next_gxid函數調用get_next_gxid(GTM_ActiveConn)函數,該函數向GTM Master發送擷取next全局事務号(‘C’+MSG_TXN_GET_NEXT_GXID)。

int gtm_standby_restore_next_gxid(void) {
  GlobalTransactionId next_gxid = InvalidGlobalTransactionId;
  next_gxid = get_next_gxid(GTM_ActiveConn);
  GTM_RestoreTxnInfo(NULL, next_gxid, NULL, true);
  elog(DEBUG1, "Restoring the next GXID done.");
  return 1;
}

GlobalTransactionId get_next_gxid(GTM_Conn *conn) {
  GTM_Result *res = NULL;
  GlobalTransactionId next_gxid;
  time_t finish_time;
   /* Start the message. */
  if (gtmpqPutMsgStart('C', true, conn) ||
      gtmpqPutInt(MSG_TXN_GET_NEXT_GXID, sizeof (GTM_MessageType), conn))
    goto send_failed;
  /* Finish the message. */
  if (gtmpqPutMsgEnd(conn))
    goto send_failed;
  /* Flush to ensure backend gets it. */
  if (gtmpqFlush(conn))
    goto send_failed;
  finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
  if (gtmpqWaitTimed(true, false, conn, finish_time) ||
    gtmpqReadData(conn) < 0)
    goto receive_failed;
  if ((res = GTMPQgetResult(conn)) == NULL)
    goto receive_failed;
  fprintf(stderr, "GTMPQgetResult() done.\n");
  fflush(stderr);
  next_gxid = res->gr_resdata.grd_next_gxid;
  if (res->gr_status == GTM_RESULT_OK)
    Assert(res->gr_type == TXN_GET_NEXT_GXID_RESULT);
  /* FIXME: should be a number of gxids */
  return next_gxid;
receive_failed:
send_failed:
  conn->result->gr_status = GTM_RESULT_COMM_ERROR;
  return InvalidGlobalTransactionId;
}      

gtm_standby_restore_gxid函數是GTM Standby恢複gxid的函數和GTM_RestoreTxnInfo函數是GTM Standby将從GTM Master擷取的Txn的資訊存儲。gtm_standby_restore_sequence函數是GTM Standby從GTM Master恢複sequence的函數。

int gtm_standby_restore_gxid(void) {
  int num_txn;
  GTM_Transactions txn;
  int i;
  /* Restore gxid data. */
  num_txn = get_txn_gxid_list(GTM_ActiveConn, &txn);
  GTM_RWLockAcquire(&GTMTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE);
  GTM_RWLockAcquire(&GTMTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
  GTMTransactions.gt_txn_count = txn.gt_txn_count;
  GTMTransactions.gt_gtm_state = txn.gt_gtm_state;
  GTMTransactions.gt_nextXid = txn.gt_nextXid;
  GTMTransactions.gt_oldestXid = txn.gt_oldestXid;
  GTMTransactions.gt_xidVacLimit = txn.gt_xidVacLimit;
  GTMTransactions.gt_xidWarnLimit = txn.gt_xidWarnLimit;
  GTMTransactions.gt_xidStopLimit = txn.gt_xidStopLimit;
  GTMTransactions.gt_xidWrapLimit = txn.gt_xidWrapLimit;
  GTMTransactions.gt_latestCompletedXid = txn.gt_latestCompletedXid;
  GTMTransactions.gt_snapid = txn.gt_snapid;
  GTMTransactions.gt_recent_global_xmin = txn.gt_recent_global_xmin;
  GTMTransactions.gt_lastslot = txn.gt_lastslot;
  for (i = 0; i < num_txn; i++){
    int handle = txn.gt_transactions_array[i].gti_handle;
    GTMTransactions.gt_transactions_array[handle].gti_handle = txn.gt_transactions_array[i].gti_handle;
    GTMTransactions.gt_transactions_array[handle].gti_client_id = txn.gt_transactions_array[i].gti_client_id;
    GTMTransactions.gt_transactions_array[handle].gti_in_use = txn.gt_transactions_array[i].gti_in_use;
    GTMTransactions.gt_transactions_array[handle].gti_gxid = txn.gt_transactions_array[i].gti_gxid;
    GTMTransactions.gt_transactions_array[handle].gti_state = txn.gt_transactions_array[i].gti_state;
    GTMTransactions.gt_transactions_array[handle].gti_xmin = txn.gt_transactions_array[i].gti_xmin;
    GTMTransactions.gt_transactions_array[handle].gti_isolevel = txn.gt_transactions_array[i].gti_isolevel;
    GTMTransactions.gt_transactions_array[handle].gti_readonly = txn.gt_transactions_array[i].gti_readonly;
    GTMTransactions.gt_transactions_array[handle].gti_proxy_client_id = txn.gt_transactions_array[i].gti_proxy_client_id;
    if (txn.gt_transactions_array[i].nodestring == NULL )
      GTMTransactions.gt_transactions_array[handle].nodestring = NULL;
    else
      GTMTransactions.gt_transactions_array[handle].nodestring = txn.gt_transactions_array[i].nodestring;
    /* GID */
    if (txn.gt_transactions_array[i].gti_gid == NULL )
      GTMTransactions.gt_transactions_array[handle].gti_gid = NULL;
    else
      GTMTransactions.gt_transactions_array[handle].gti_gid = txn.gt_transactions_array[i].gti_gid;
    /* copy GTM_SnapshotData */
    GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xmin = txn.gt_transactions_array[i].gti_current_snapshot.sn_xmin;
    GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xmax = txn.gt_transactions_array[i].gti_current_snapshot.sn_xmax;
    GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xcnt = txn.gt_transactions_array[i].gti_current_snapshot.sn_xcnt;
    GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xip = txn.gt_transactions_array[i].gti_current_snapshot.sn_xip;
    /* end of copying GTM_SnapshotData */
    GTMTransactions.gt_transactions_array[handle].gti_snapshot_set = txn.gt_transactions_array[i].gti_snapshot_set;
    GTMTransactions.gt_transactions_array[handle].gti_vacuum = txn.gt_transactions_array[i].gti_vacuum;
    /* Is this correct? Is GTM_TXN_COMMITTED transaction categorized as "open"? */
    if (GTMTransactions.gt_transactions_array[handle].gti_state != GTM_TXN_ABORTED) {
      GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, &GTMTransactions.gt_transactions_array[handle]);
    }
  }
  dump_transactions_elog(&GTMTransactions, num_txn);
  GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
  GTM_RWLockRelease(&GTMTransactions.gt_XidGenLock);
  elog(DEBUG1, "Restoring %d gxid(s) done.", num_txn);
  return 1;
}

int gtm_standby_restore_sequence(void) {
  GTM_SeqInfo *seq_list;
  int num_seq;
  int i;
  /* Restore sequence data. */
  num_seq = get_sequence_list(GTM_ActiveConn, &seq_list);
  for (i = 0; i < num_seq; i++) {
    GTM_SeqRestore(seq_list[i].gs_key,seq_list[i].gs_increment_by,seq_list[i].gs_min_value,seq_list[i].gs_max_value,seq_list[i].gs_init_value,seq_list[i].gs_value,seq_list[i].gs_state,seq_list[i].gs_cycle,seq_list[i].gs_called);
  }
  elog(DEBUG1, "Restoring sequences done.");
  return 1;
}      

GTM Master特有函數1

如下代碼是GTM Master是從control file恢複Txn和Seq。

GTM_RestoreContext restoreContext;
GTM_MutexLockAcquire(&control_lock);
ctlf = fopen(GTMControlFile, "r");
GTM_RestoreStart(ctlf, &restoreContext);
GTM_RestoreTxnInfo(ctlf, next_gxid, &restoreContext, force_xid);
GTM_RestoreSeqInfo(ctlf, &restoreContext);
if (ctlf)
  fclose(ctlf);
GTM_MutexLockRelease(&control_lock);      

共有函數分析2

備份恢複點,GTM_SetNeedBackup擷取全局gtm_bkup_lock鎖,将gtm_need_bkup設定為True,然後調用GTM_WriteRestorePoint項control file中寫入PointVersion、PointXid和PointSeq。

void GTM_SetNeedBackup(void) {
  GTM_RWLockAcquire(&gtm_bkup_lock, GTM_LOCKMODE_READ);
  gtm_need_bkup = TRUE;
  GTM_RWLockRelease(&gtm_bkup_lock);
}

void GTM_WriteRestorePoint(void) {
  FILE *f = fopen(GTMControlFile, "w");
  if (f == NULL){
    ereport(LOG, (errno,errmsg("Cannot open control file"), errhint("%s", strerror(errno))));
    return;
  }
  GTM_RWLockAcquire(&gtm_bkup_lock, GTM_LOCKMODE_WRITE);
  if (!gtm_need_bkup){
    GTM_RWLockRelease(&gtm_bkup_lock);
    fclose(f);
    return;
  }
  gtm_need_bkup = FALSE;
  GTM_RWLockRelease(&gtm_bkup_lock);
  GTM_WriteRestorePointVersion(f);
  GTM_WriteRestorePointXid(f);
  GTM_WriteRestorePointSeq(f);
  fclose(f);
}      

GTM Standby特有函數2

Standby向GTM Master将自己注冊為disconnected節點,在Restoring結束後狀态會被更改(gtm_standby_update_self)。

int gtm_standby_register_self(const char *node_name, int port, const char *datadir) {
  int rc;
  elog(DEBUG1, "Registering standby-GTM status...");
  node_get_local_addr(GTM_ActiveConn, standbyHostName, sizeof(standbyNodeName), &rc);
  if (rc != 0)
    return 0;
  memset(standbyNodeName, 0, NI_MAXHOST);
  strncpy(standbyNodeName, node_name, NI_MAXHOST - 1);
  standbyPortNumber = port;
  standbyDataDir= (char *)datadir;
  rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber,standbyNodeName, standbyDataDir,NODE_DISCONNECTED);
  if (rc < 0){
    elog(DEBUG1, "Failed to register a standby-GTM status.");
    return 0;
  }
  elog(DEBUG1, "Registering standby-GTM done.");
  return 1;
}      

gtm_standby_restore_node函數從GTM Master上擷取Node節點資訊,并在standby上恢複這些節點的資訊。

int gtm_standby_restore_node(void) {
  GTM_PGXCNodeInfo *data;
  int rc, i;
  int num_node;
  elog(LOG, "Copying node information from the GTM active...");
  data = (GTM_PGXCNodeInfo *) malloc(sizeof(GTM_PGXCNodeInfo) * 128);
  memset(data, 0, sizeof(GTM_PGXCNodeInfo) * 128);
  rc = get_node_list(GTM_ActiveConn, data, 128);
  if (rc < 0){
    elog(DEBUG3, "get_node_list() failed.");
    rc = 0;
    goto finished;
  }
  num_node = rc;
  for (i = 0; i < num_node; i++){
    elog(DEBUG1, "get_node_list: nodetype=%d, nodename=%s, datafolder=%s",data[i].type, data[i].nodename, data[i].datafolder);
    if (Recovery_PGXCNodeRegister(data[i].type, data[i].nodename, data[i].port, data[i].proxyname, data[i].status, data[i].ipaddress, data[i].datafolder, true, -1 /* dummy socket */, false) != 0){
      rc = 0;
      goto finished;
    }
  }
  elog(LOG, "Copying node information from GTM active done.");
finished:
  free(data);
  return rc;
}      

GTM Standby特有函數3

gtm_standby_activate_self函數将standby狀态從disconnected更新為connected

int gtm_standby_activate_self(void) {
  int rc;
  elog(DEBUG1, "Updating the standby-GTM status to \"CONNECTED\"...");
  rc = node_unregister(GTM_ActiveConn, GTM_NODE_GTM, standbyNodeName);
  if (rc < 0){
    elog(DEBUG1, "Failed to unregister old standby-GTM status.");
    return 0;
  }
  rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber,standbyNodeName, standbyDataDir,NODE_CONNECTED);
  if (rc < 0){
    elog(DEBUG1, "Failed to register a new standby-GTM status.");
    return 0;
  }
  elog(DEBUG1, "Updating the standby-GTM status done.");
  return 1;
}      
int gtm_standby_end_backup(void) {
  int rc = set_begin_end_backup(GTM_ActiveConn, false);
  return (rc ? 0 : 1);
}

int gtm_standby_finish_startup(void) {
  elog(DEBUG1, "Closing a startup connection...");
  GTMPQfinish(GTM_ActiveConn);
  GTM_ActiveConn = NULL;
  elog(DEBUG1, "A startup connection closed.");
  return 1;
}      

Failover