天天看點

PG守護程序(Postmaster)——輔助程序PgStat主流程

PgStat輔助程序是PostgreSQL資料庫的統計資訊收集器,它專門負責收集資料庫系統運作中的統計資訊,如在一個表和索引上進行了多少次插入與更新操作、磁盤塊的數量和元組的數量、每個表上最近一次執行清理和分析操作的時間,以及統計每個使用者自定義函數調用執行的時間等。系統表pg_statistic中存儲了PgStat收集的各類統計資訊,另外在資料庫集簇的目錄下有與統計資訊收集器相關的檔案:global子檔案夾下的pgstat.stat檔案用于儲存目前全局的統計資訊;pg_stat_tmp檔案則是PgStat程序和各個背景程序進行互動的臨時檔案所在地。本篇部落客要關注PgStat統計資料收集程序的主流程。

postmaster為PgStat建立相應資源

PG守護程式(Postmaster)——輔助程式PgStat主流程

如上圖所示,postmaster守護程序會在啟動startup程序之間就會通過pgstat_init函數為PgStat程序建立相應資源,并進行初始化操作。 pgstat_init函數主要包含如下流程:建立用于和背景程序之間通信的UDP SOCKET (PgStat輔助程序會調用select/poll函數監聽UDP端口上的資料變化)、嘗試在套接字(pgStatSock)上發送和接收一個單位元組的測試消息、将套接字設定為非阻塞IO(確定如果收集器落後,統計資訊将被丢棄;後端不會阻止等待向收集器發送消息)、嘗試確定套接字的接收緩沖區至少為PGSTAT_MIN_RCVBUF位元組、

void pgstat_init(void) {
  ACCEPT_TYPE_ARG3 alen;
  struct addrinfo *addrs = NULL, *addr, hints;
  fd_set    rset;
  struct timeval tv;
  char    test_byte;
  int      sel_res, ret, tries = 0;
#define TESTBYTEVAL ((char) 199)
  /* This static assertion verifies that we didn't mess up the calculations involved in selecting maximum payload sizes for our UDP messages. Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would be silent performance loss from fragmentation, it seems worth having a compile-time cross-check that we didn't. */ // 這個靜态斷言驗證了我們在為UDP消息選擇最大有效負載大小時沒有弄亂計算。由于溢出PGSTAT_MAX_MSG_SIZE的唯一後果是碎片導緻的性能損失,是以似乎值得進行編譯時交叉檢查,但我們沒有這樣做。
  StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,"maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
  
  /* Create the UDP socket for sending and receiving statistic messages */
  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_DGRAM;
  hints.ai_protocol = 0;
  hints.ai_addrlen = 0;
  hints.ai_addr = NULL;
  hints.ai_canonname = NULL;
  hints.ai_next = NULL;
  ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
  if (ret || !addrs){
    ereport(LOG,(errmsg("could not resolve \"localhost\": %s",gai_strerror(ret))));
    goto startup_failed;
  }

  /* On some platforms, pg_getaddrinfo_all() may return multiple addresses only one of which will actually work (eg, both IPv6 and IPv4 addresses when kernel will reject IPv6).  Worse, the failure may occur at the bind() or perhaps even connect() stage.  So we must loop through the results till we find a working combination. We will generate LOG messages, but no error, for bogus combinations. */ // 在某些平台上,pg_getaddrinfo_all()可能傳回多個位址,其中隻有一個實際有效(例如,當核心拒絕IPv6時,IPv6和IPv4位址)。更糟糕的是,失敗可能發生在bind()階段,甚至可能發生在connect()階段。是以,我們必須對結果進行循環,直到找到有效的組合。我們将為假組合生成日志消息,但沒有錯誤。
  for (addr = addrs; addr; addr = addr->ai_next)
  {
#ifdef HAVE_UNIX_SOCKETS    
    if (addr->ai_family == AF_UNIX)  continue;  /* Ignore AF_UNIX sockets, if any are returned. */      
#endif
    if (++tries > 1) ereport(LOG,(errmsg("trying another address for the statistics collector")));    
    if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET){ /* Create the socket. */
      ereport(LOG,(errcode_for_socket_access(),errmsg("could not create socket for statistics collector: %m")));
      continue;
    }
    /* Bind it to a kernel assigned port on localhost and get the assigned port via getsockname(). */
    if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0) {
      ereport(LOG,(errcode_for_socket_access(),errmsg("could not bind socket for statistics collector: %m")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }
    alen = sizeof(pgStatAddr);
    if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0){
      ereport(LOG,(errcode_for_socket_access(),errmsg("could not get address of socket for statistics collector: %m")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }

    /* Connect the socket to its own address.  This saves a few cycles by not having to respecify the target address on every send. This also provides a kernel-level check that only packets from this same address will be received. */ // 将套接字連接配接到它自己的位址。這節省了幾個周期,因為不必在每次發送時重新指定目标位址。這還提供了一個核心級檢查,確定隻接收來自同一位址的資料包
    if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0){
      ereport(LOG,(errcode_for_socket_access(),errmsg("could not connect socket for statistics collector: %m")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }

    /* Try to send and receive a one-byte test message on the socket. This is to catch situations where the socket can be created but will not actually pass data (for instance, because kernel packet filtering rules prevent it). */ // 嘗試在套接字上發送和接收一個單位元組的測試消息。這是為了捕捉可以建立套接字但實際上不會傳遞資料的情況(例如,因為核心包過濾規則阻止了它)
    test_byte = TESTBYTEVAL;
retry1:
    if (send(pgStatSock, &test_byte, 1, 0) != 1){
      if (errno == EINTR) goto retry1;  /* if interrupted, just retry */
      ereport(LOG,(errcode_for_socket_access(),errmsg("could not send test message on socket for statistics collector: %m")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }

    /* There could possibly be a little delay before the message can be received.  We arbitrarily allow up to half a second before deciding it's broken. */ // 在接收消息之前可能會有一點延遲。在決定它是否損壞之前,我們任意允許半秒鐘。
    for (;;){        /* need a loop to handle EINTR */
      FD_ZERO(&rset);
      FD_SET(pgStatSock, &rset);
      tv.tv_sec = 0;
      tv.tv_usec = 500000;
      sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv); // select帶逾時阻塞等待
      if (sel_res >= 0 || errno != EINTR) break;
    }
    if (sel_res < 0){
      ereport(LOG,(errcode_for_socket_access(),errmsg("select() failed in statistics collector: %m")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }
    if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset)){
      /* This is the case we actually think is likely, so take pains to give a specific message for it. errno will not be set meaningfully here, so don't use it. */ // 這是我們實際上認為可能發生的情況,是以請盡力為其提供具體資訊。errno在這裡不會被設定為有意義的,是以不要使用它
      ereport(LOG,(errcode(ERRCODE_CONNECTION_FAILURE),errmsg("test message did not get through on socket for statistics collector")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }
    test_byte++;      /* just make sure variable is changed */

retry2:
    if (recv(pgStatSock, &test_byte, 1, 0) != 1) { // 接收測試資料
      if (errno == EINTR) goto retry2;  /* if interrupted, just retry */
      ereport(LOG,(errcode_for_socket_access(),errmsg("could not receive test message on socket for statistics collector: %m")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }
    if (test_byte != TESTBYTEVAL){  /* strictly paranoia ... */
      ereport(LOG,(errcode(ERRCODE_INTERNAL_ERROR),errmsg("incorrect test message transmission on socket for statistics collector")));
      closesocket(pgStatSock);
      pgStatSock = PGINVALID_SOCKET;
      continue;
    }    
    break; /* If we get here, we have a working socket */
  }  
  if (!addr || pgStatSock == PGINVALID_SOCKET) goto startup_failed; /* Did we find a working address? */

  /* Set the socket to non-blocking IO.  This ensures that if the collector falls behind, statistics messages will be discarded; backends won't block waiting to send messages to the collector. */ // 将套接字設定為非阻塞IO。這確定了如果收集器落後,統計資訊将被丢棄;後端不會阻止等待向收集器發送消息
  if (!pg_set_noblock(pgStatSock)){
    ereport(LOG,(errcode_for_socket_access(),errmsg("could not set statistics collector socket to nonblocking mode: %m")));
    goto startup_failed;
  }

  /* Try to ensure that the socket's receive buffer is at least PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose data.  Use of UDP protocol means that we are willing to lose data under heavy load, but we don't want it to happen just because of ridiculously small default buffer sizes (such as 8KB on older Windows versions). */ // 嘗試確定套接字的接收緩沖區至少為PGSTAT_MIN_RCVBUF位元組,這樣它就不會輕易溢出和丢失資料。UDP協定的使用意味着我們願意在重負載下丢失資料,但我們不希望僅僅因為預設緩沖區大小小得可笑(例如舊Windows版本上的8KB)而發生這種情況
  {
    int      old_rcvbuf, new_rcvbuf;
    ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
    if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,(char *) &old_rcvbuf, &rcvbufsize) < 0){
      ereport(LOG,(errmsg("%s(%s) failed: %m", "getsockopt", "SO_RCVBUF")));    
      old_rcvbuf = 0; /* if we can't get existing size, always try to set it */
    }
    new_rcvbuf = PGSTAT_MIN_RCVBUF;
    if (old_rcvbuf < new_rcvbuf){
      if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,(char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
        ereport(LOG,(errmsg("%s(%s) failed: %m", "setsockopt", "SO_RCVBUF")));
    }
  }
  pg_freeaddrinfo_all(hints.ai_family, addrs);  
  ReserveExternalFD(); /* Now that we have a long-lived socket, tell fd.c about it. */
  return;

startup_failed:
  ereport(LOG,(errmsg("disabling statistics collector for lack of working socket")));
  if (addrs) pg_freeaddrinfo_all(hints.ai_family, addrs);
  if (pgStatSock != PGINVALID_SOCKET) closesocket(pgStatSock);
  pgStatSock = PGINVALID_SOCKET;

  /* Adjust GUC variables to suppress useless activity, and for debugging purposes (seeing track_counts off is a clue that we failed here). We use PGC_S_OVERRIDE because there is no point in trying to turn it back on from postgresql.conf without a restart. */ // 調整GUC變量以抑制無用的活動,并且出于調試目的(看到track_counts off是我們在這裡失敗的線索)。我們使用PGC_S_OVERRIDE,因為在postgresql中嘗試将其重新打開是沒有意義的。
  SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
}      

PgStat程序啟動與保活

pgstat_start函數首先會檢查PgStat程序的初始化函數pgstat_init是否成功建立UDP Socket端口,如果建立失敗,則退出PgStat程序;否則建立PgStat程序。最後進入PgStat程序的PgstatCollectorMain函數。那麼它将被何時調用,或者說是如何保活呢?

  • 由postmaster守護程序在啟動時調用(ServerLoop函數中,當PgStatPID為0,且pmState為PM_RUN或PM_HOT_STANDBY時)
  • 在PgStat程序死亡調用。所謂PgStat程序死亡,也就是在postmaster守護程序注冊SIGCHLD信号的回調函數執行時,發現PgStatPID為0或者waitpid的傳回值是PgStatPID時。
  • startup程序向postmaster發送siguser1信号請求PMSIGNAL_BEGIN_HOT_STANDBY且目前pmState為PM_RECOVERY且Shutdown為NoShutdown時調用。

pgstat_start函數執行成功傳回PgStat程序程序号,否則傳回0。

int pgstat_start(void) {
  time_t    curtime;
  pid_t    pgStatPid;

  /* Check that the socket is there, else pgstat_init failed and we can do nothing useful. */ // 檢查套接字是否存在,否則pgstat_init失敗,我們無法做任何有用的事情
  if (pgStatSock == PGINVALID_SOCKET) return 0;
  /* Do nothing if too soon since last collector start.  This is a safety valve to protect against continuous respawn attempts if the collector is dying immediately at launch.  Note that since we will be re-called from the postmaster main loop, we will get another chance later. */ // 如果自上次收集器啟動後過快,則不執行任何操作。這是一個安全閥,用于在收集器在發射時立即死亡的情況下防止持續的複活嘗試。請注意,由于我們将從postmaster主循環中重新調用,我們稍後将有另一次機會
  curtime = time(NULL);
  if ((unsigned int) (curtime - last_pgstat_start_time) < (unsigned int) PGSTAT_RESTART_INTERVAL) return 0;
  last_pgstat_start_time = curtime;

  /* Okay, fork off the collector. */
#ifdef EXEC_BACKEND
  switch ((pgStatPid = pgstat_forkexec()))
#else
  switch ((pgStatPid = fork_process()))
#endif
  {
    case -1:
      ereport(LOG,(errmsg("could not fork statistics collector: %m")));
      return 0;
#ifndef EXEC_BACKEND
    case 0:    // PgStat子程序執行邏輯  
      InitPostmasterChild(); /* in postmaster child ... */      
      ClosePostmasterPorts(false); /* Close the postmaster's sockets */    
      dsm_detach_all(); /* Drop our connection to postmaster's shared memory, as well */
      PGSharedMemoryDetach();
      PgstatCollectorMain(0, NULL);
      break;
#endif
    default: return (int) pgStatPid;
  }  
  return 0; /* shouldn't get here */
}      

PgStat程序主流程PgstatCollectorMain

在初始化階段,會讀取預設位置(資料集簇中的global子目錄下)的統計資訊檔案pgstat.stat。從統計檔案中讀取資料庫、表、函數的統計資訊,建構資料庫統計資訊的Hash表​

​pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true)​

​。若讀取失敗或第一次啟動,則各種統計資訊計數器被設定為0。

如果最近更新統計檔案時間小于最近申請讀統計檔案的時間,需要将統計資訊臨時檔案中最新的統計資訊寫入pgstat.stat檔案中,并設定最近更新統計檔案時間和最近申請讀統計檔案時間為目前系統時間(如果需要寫入statfile檔案步驟);最近申請讀統計檔案的時間由PGSTAT_MTYPE_INQUIRY消息設定。