天天看點

MySQL · 源碼分析 · change master to

重要資料結構

MySQL · 源碼分析 · change master to
Rpl_info 的基類,儲存了一些錯誤資訊,如 IO/SQL thread last error

class Slave_reporting_capability
{

  // 擷取last error
  Error const& last_error() const { return m_last_error; }

}


Master_info 、Relay_log_info 的基類,很多重要的鎖和信号量都在這裡


class Rpl_info : public Slave_reporting_capability
{
  /*
    為了避免死鎖,需要按照以下順序加鎖
    run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
    run_lock, sleep_lock
    run_lock, info_thd_lock
    
    info_thd_lock 保護對 info_thd 的操作
    讀操作需要擷取 info_thd_lock 或 run_lock
    寫操作需要擷取 info_thd_lock 和 run_lock
    
    data_lock : 保護對資料的讀寫
    run_lock  : 保護運作狀态,變量 slave_running, slave_run_id
    sleep_lock: 對slave_sleep做互斥,防止同時進入多個slave_sleep
  */
  mysql_mutex_t data_lock, run_lock, sleep_lock, info_thd_lock;

  /*
    data_cond: data_lock 保護的資料被修改時發出此信号樓,隻有 Relay_log_info 會使用
    start_cond: sql thread start (start slave 先啟動 io thread,後啟動 sql thread,sql thread 啟動代表全部啟動成功)
    stop_cond: sql/io thread stop
    sleep_cond: slave被kill,目前隻發現5.6中有應用,5.7 中沒找到發出此信号量的代碼
  */
  mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
  
  /* 
    關聯 io/sql/worker thread 的 thd
    Master_info 對應 io thread
    Relay_log_info 對應 sql thread
    Slave_worker 對應 worker thread 
  */
  THD *info_thd;
  
  
  /* 已初始化 */
  bool inited;
  
  /* 已終止slave */
  volatile bool abort_slave;
  
  /*
    目前slave運作狀态,io thread 即 mi->slave_running有三種狀态
    MYSQL_SLAVE_NOT_RUN         0
	MYSQL_SLAVE_RUN_NOT_CONNECT 1
	MYSQL_SLAVE_RUN_CONNECT     2 
	
	sql thread 即 mi->slave_running 隻有 0 1兩種狀态
  */
  volatile uint slave_running;
  
  /* 用于判斷slave thread 是否啟動的變量,每次啟動時遞增1 */
  volatile ulong slave_run_id;

  /* repository's handler */
  Rpl_info_handler *handler;
  
  /* 
    唯一辨別一條 info 記錄的 id,辨別一行或一個檔案
    Master_info 和 Relay_log_info 可以通過 channel 辨別唯一
    Worker_info 需要通過 {id, channel} 辨別唯一
  */
  uint internal_id;

  /* 通道名,多源複制使用 */
  char channel[CHANNEL_NAME_LENGTH+1];
  
  /* 實作了兩個虛函數,讀寫 Rpl_info */
  virtual bool read_info(Rpl_info_handler *from)= 0;
  virtual bool write_info(Rpl_info_handler *to)= 0;
}

/*
  Master_info 使用者 IO thread
  主要儲存以下資訊:
    連接配接到Master的使用者資訊
    目前 master log name
    目前 master log offset
    一些其他控制變量
    
  Master_info 讀取 master.info repository 初始化,通常是表或檔案
  通過函數 mi_init_info() 進行初始化
  
  調用 flush_info() 可以将 master.info 寫入磁盤,每次從master讀取資料都需要刷盤
*/
class Master_info : public Rpl_info
{
  /* 前面儲存了 user、host 等連接配接資訊 */
  /* 下面是連接配接資訊的 set/get 函數 */

  /* 和 master 的連接配接 */
  MYSQL* mysql;
  
  
  /* 對應的 Relay_log_info */        
  Relay_log_info *rli;
  
  /* IO 線程複制延遲 */
  long clock_diff_with_master;
  
  /* 心跳間隔 */
  float heartbeat_period;         // interface with CHANGE MASTER or master.info
  
  /* 收到心跳次數 */
  ulonglong received_heartbeats;  // counter of received heartbeat events

  /* 上次心跳時間 */
  time_t last_heartbeat;

  /* 上次收到event的時間 */
  time_t last_recv_event_timestamp;

  /* 忽略複制的server_id */
  Server_ids *ignore_server_ids;

  /* master server_id */
  ulong master_id;
  
  /* FORMAT_DESCRIPTION_EVENT前的checksum 算法 */
  binary_log::enum_binlog_checksum_alg checksum_alg_before_fd;
  
  /* 初始化 Master_info, 裡面調用read_info() 從 Rpl_info_handler 中讀取資訊 */
  int mi_init_info();
  
  /* 清理 Master_info,裡面會調用 Rpl_info_handler 的 eng_info() */
  void end_info();
  
  /* Master_info 資訊落盤,每次從主庫讀取資料後都會執行 */
  int flush_info(bool force= FALSE);
  
  /*
    從master收到的 Format_description_log_event 寫在 relay log 末尾
    
    IO thread 開始時建立,IO thread 結束時銷毀
    IO thread 收到一個 Format_description_log_event 時更新
    每次rotate時,IO thread 寫Format_description_log_event到新relay log
    每次執行FLUSH LOGS,client 寫Format_description_log_event到新relay log
  */
  Format_description_log_event *mi_description_event;

  /* 最近一個GTID,可能是未完成事務的GTID,用于事務結束時寫入 Retrieved_Gtid_Set */
  Gtid last_gtid_queued;
  
  /* 用于判斷事務邊界 */
  Transaction_boundary_parser transaction_parser;
  
  /* 
    channel lock,以下操作需要持有寫鎖
    START SLAVE;
	STOP SLAVE;
	CHANGE MASTER;
	RESET SLAVE;
	end_slave();
  */
  Checkable_rwlock *m_channel_lock;
	
  /* channel 被引用的次數,隻有為0時可以删除channel */
  Atomic_int32 references;
}

           
/*
  主要儲存以下資訊:
    目前relay log
    目前relay log offset
    master log name
    與上次更新對應的主庫日志序列
    sql thread 其他資訊

	初始化過程和 Master_info 類似
	
	以下情況下 relay.info table/file 需要更新:
	1. relay log file rotated
	2. SQL thread stopped
	3. while processing a Xid_log_event
	4. after a Query_log_event(commit or rollback)
	5. after processing any statement written to the binary log without a transaction context.
	
	并行複制相關代碼留作以後分析,本次暫不涉及
*/
class Relay_log_info : public Rpl_info
{
  /* 備份狀态标志位,用于标志是否在語句中 */
  enum enum_state_flag {
    /* 在語句中 */
    IN_STMT,

    /** Flag counter.  Should always be last */
    STATE_FLAGS_COUNT
  };
  
  /* 是否複制相同server_id的event,一般是false */
  bool replicate_same_server_id;
  
  /* 正在執行或最後一個執行的GTID,用來填充 performance_schema.replication_applier_status_by_worke 的 last_seen_transaction 列
  */
  Gtid_specification currently_executing_gtid;
  
  
  /* 讀取下面變量時,必須受data_lock保護 */
  
  /* 目前relay log的檔案描述符 */
  File cur_log_fd;
  
  /* reay_log 對象,MYSQL_BIN_LOG類留作以後分析*/
  MYSQL_BIN_LOG relay_log;
  
  /* 主要用于查詢log_pos */
  LOG_INFO linfo;
  
  /*
    cur_log 指向 relay_log.get_log_file() 或者 cache_buf
    取決于relay_log_file是熱日志,還是需要打開冷日志
    
    cache_buf 在打開冷日志時适用
  */
  IO_CACHE cache_buf,*cur_log;
  
  /* 辨別是否正在recovery */
  bool is_relay_log_recovery;
  
  /* 下面的變量可以不加鎖讀 */
  
  /*
    restart slave 時需要通路臨時表。
    這個變量值在 init/end 時修改
    SQL thread 隻讀
  */
  TABLE *save_temporary_tables;
  
  /* 對應的 Master_info */
  Master_info *mi;
  
  /* 打開臨時表的數量 */
  Atomic_int32 channel_open_temp_tables;
  
  /* 儲存 relay_log.get_open_count() */
  uint32 cur_log_old_open_count;
  
  /* init_info() 曾經失敗,RESET SLAVE 可以修複錯誤 */
  bool error_on_rli_init_info;
  
  /* 這裡跳過一些 Group replication 相關變量 */
  
  /* received gtid set */
  Gtid_set gtid_set;
  
  /* 辨別此對象是否屬于SQL線程(屬于SQL線程為0) */
  bool rli_fake;
  
  /* 辨別 retrieved GTID set 是否已被初始化 */
  bool gtid_retrieved_initialized;
  
  /* 上一個錯誤的GTID */
  Gtid last_sql_error_gtid;
  
  
  /* 日志空間限制,日志空間總量(用于sys_var:relay_log_space,控制relay log空間) */
  ulonglong log_space_limit,log_space_total;
  
  /* 是否忽略日志空間限制 */
  bool ignore_log_space_limit;

  
  /* 需要清理空間時,SQL線程訓示IO線程rotate logs */
  bool sql_force_rotate_relay;
  
  /* 上次主庫記錄binlog的時間 */
  time_t last_master_timestamp;
  
  /* 上次執行event的時間 */
  time_t last_exec_event_timestamp;
  
  /* 跳過error event */
  volatile uint32 slave_skip_counter;
  
  /* 标記是否需要中斷pos_wait,change master 和 reset slave時需要中斷 */
  volatile ulong abort_pos_wait;
  
  /* log_space 相關信号量*/
  mysql_mutex_t log_space_lock;
  mysql_cond_t log_space_cond;

  /* 這裡有一些 START SLAVE UNTIL 相關變量 */
  
  /* 重試事務次數(trans_retries是重試次數上限),重試事務計數(retried_trans記錄重試了多少次) */
  ulong trans_retries, retried_trans;
 
  /* 
    延遲複制時間
    CHANGE MASTER TO MASTER_DELAY=X.
    由data_lock保護, SQL thread 讀取
    SQL thread 運作時該變量不可寫
  */
  time_t sql_delay;
  
  /* sql_delay 結束時間 */
  time_t sql_delay_end;

  /* enum_state_flag 的标志位 */
  uint32 m_flags;  
}

           

函數分析

mysql_execute_command() 當做入口開始分析,可以看出change_master需要SUPER權限

case SQLCOM_CHANGE_MASTER:
  {
    if (check_global_access(thd, SUPER_ACL))
      goto error;
    res= change_master_cmd(thd);
    break;
  }
  
/*
  函數具備以下功能
    更改接收/執行日志的配置/位點
    purge relay log
    删除 worker info(并行複制使用)
*/

/* 分析 change_master 函數會略過一部分邏輯 */
int change_master(THD* thd, Master_info* mi, LEX_MASTER_INFO* lex_mi,
                  bool preserve_logs)
{
  /*
    如果SQL thread 和 IO thread已經停止,并且沒有指定 relay_log_pos和relay_log_file
    會purge relay log
  */
  bool need_relay_log_purge= 1;

  /* 為了修改 mysql.slave_master_info,需要無視read_only和super_read_only */
  thd->set_skip_readonly_check();
  
  /* channel 加讀寫鎖,即将對channel做修改,函數結束時才會釋放鎖 */
  mi->channel_wrlock();
  
  /*
    對 mi->run_lock 和 rli->run_lock 加鎖
    防止線程運作狀态發生變化
  */
  lock_slave_threads(mi);

  /* 設定thread_mask,用來辨別 IO/SQL thread 的運作狀态 */
  init_thread_mask(&thread_mask, mi, 0);

  /* 設定auto_position=1需要IO/SQL thread 都不在運作狀态,否則報錯退出 */
  if (thread_mask)
  {
    if (lex_mi->auto_position != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
    {
      error= ER_SLAVE_CHANNEL_MUST_STOP;
      my_error(ER_SLAVE_CHANNEL_MUST_STOP, MYF(0), mi->get_channel());
      goto err;
    }
    
    /* 如果 SQL thread 和 IO thread 沒有全部停止,不能purge relay log */
    need_relay_log_purge= 0;
  }

  
  /*
    下面是一些錯誤判斷,都是很明顯的錯誤
      1. 如果設定了auto_position,同時又指定了複制位點,如 relay_log_pos,報錯退出
      2. auto_position 需要 GTID_MODE != OFF
      3. IO thread 運作時不能改變 IO thread 相關配置
      4. SQL thread 運作時不能改變 SQL thread 相關配置
      5. 如果指定了master_host,那麼master_host不能是空串  
  */

  /* 記錄目前狀态 */
  THD_STAGE_INFO(thd, stage_changing_master); 

  /* 辨別停止的線程,給load_mi_and_rli_from_repositories()使用 */
  init_thread_mask(&thread_mask_stopped_threads, mi, 1);

  /* 
    從倉庫加載 mi 和 rli 的配置
    隻有停止狀态的線程可以加載配置(SQL thread 對應 rli,IO thread 對應 mi)
  */
  if (load_mi_and_rli_from_repositories(mi, false, thread_mask_stopped_threads))                                                                                                                            
  {
    error= ER_MASTER_INFO;
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
    goto err;
  }

  /* 
    修改mi相關配置,并儲存老配置
    save_ 變量中儲存的老配置用于列印日志
  */
  if (have_receive_option)
  {
    strmake(saved_host, mi->host, HOSTNAME_LENGTH);
    strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH);
    saved_port= mi->port;
    strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1);
    saved_log_pos= mi->get_master_log_pos();

    if ((error= change_receive_options(thd, lex_mi, mi)))                                                                                                                                                   
    {
      goto err;
    }
  }

  /* 列印日志,change master 的源值和目标值 */
  if (have_receive_option)
    sql_print_information("'CHANGE MASTER TO%s executed'. "
      "Previous state master_host='%s', master_port= %u, master_log_file='%s', "
      "master_log_pos= %ld, master_bind='%s'. "
      "New state master_host='%s', master_port= %u, master_log_file='%s', "
      "master_log_pos= %ld, master_bind='%s'.",
      mi->get_for_channel_str(true),
      saved_host, saved_port, saved_log_name, (ulong) saved_log_pos,
      saved_bind_addr, mi->host, mi->port, mi->get_master_log_name(),
      (ulong) mi->get_master_log_pos(), mi->bind_addr);

  /* 修改rli相關配置 */
  if (have_execute_option)
    change_execute_options(lex_mi, mi);
    
  /* 持久化master_info */
  if ((thread_mask & SLAVE_IO) == 0 && flush_master_info(mi, true))
  {
    error= ER_RELAY_LOG_INIT;
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
    goto err;
  }
  
  if ((thread_mask & SLAVE_SQL) == 0)
  {

    /* 記錄全局變量 relay_log_purge */                                                                                                                                                            
    bool save_relay_log_purge= relay_log_purge;

    if (need_relay_log_purge)
    {
      const char* errmsg= 0;

      /* purge relay logs */
      relay_log_purge= 1;
      THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
      if (mi->rli->purge_relay_logs(thd,
                                    0 /* not only reset, but also reinit */,
                                    &errmsg))
      {
        error= ER_RELAY_LOG_FAIL;
        my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
        goto err;
      }
    }
    else
    {

      const char* msg;
      relay_log_purge= 0;

      DBUG_ASSERT(mi->rli->inited);
      
      /*初始化 relay_log_pos */
      if (mi->rli->init_relay_log_pos(mi->rli->get_group_relay_log_name(),
                                      mi->rli->get_group_relay_log_pos(),
                                      true/*we do need mi->rli->data_lock*/,
                                      &msg, 0))
      {
        error= ER_RELAY_LOG_INIT;
        my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
        goto err;
      }
    }
    
    /* 恢複全局變量 relay_log_purge 的值 */
    relay_log_purge= save_relay_log_purge;

    /* 清理until condition */
    mi->rli->clear_until_condition();

    /* relay_log_info 持久化到磁盤 */
    if (mi->rli->flush_info(true))
    {
      error= ER_RELAY_LOG_INIT;
      my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush relay info file.");
      goto err;
    }

  }

/* 出錯後跳到次數,釋放之前申請的鎖 */
err:

  unlock_slave_threads(mi);
  mi->channel_unlock();
  DBUG_RETURN(error);

}
           

總結

change master 主要功能是修改 SQL 和 IO 線程的配置資訊,執行時可能會purge relay log

沒有特殊情況,建議指定auto_position=1,不要自己指定複制位點,避免資料丢失風險

如需對change master 做修改,需要注意在鎖保護下修改變量,同時注意加鎖順序,避免死鎖