重要資料結構
Rpl_info 的基類,儲存了一些錯誤資訊,如 IO/SQL thread last error
class Slave_reporting_capability
{
// 擷取last error
Error const& last_error() const { return m_last_error; }
}
Master_info 、Relay_log_info 的基類,很多重要的鎖和信号量都在這裡
class Rpl_info : public Slave_reporting_capability
{
/*
為了避免死鎖,需要按照以下順序加鎖
run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
run_lock, sleep_lock
run_lock, info_thd_lock
info_thd_lock 保護對 info_thd 的操作
讀操作需要擷取 info_thd_lock 或 run_lock
寫操作需要擷取 info_thd_lock 和 run_lock
data_lock : 保護對資料的讀寫
run_lock : 保護運作狀态,變量 slave_running, slave_run_id
sleep_lock: 對slave_sleep做互斥,防止同時進入多個slave_sleep
*/
mysql_mutex_t data_lock, run_lock, sleep_lock, info_thd_lock;
/*
data_cond: data_lock 保護的資料被修改時發出此信号樓,隻有 Relay_log_info 會使用
start_cond: sql thread start (start slave 先啟動 io thread,後啟動 sql thread,sql thread 啟動代表全部啟動成功)
stop_cond: sql/io thread stop
sleep_cond: slave被kill,目前隻發現5.6中有應用,5.7 中沒找到發出此信号量的代碼
*/
mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
/*
關聯 io/sql/worker thread 的 thd
Master_info 對應 io thread
Relay_log_info 對應 sql thread
Slave_worker 對應 worker thread
*/
THD *info_thd;
/* 已初始化 */
bool inited;
/* 已終止slave */
volatile bool abort_slave;
/*
目前slave運作狀态,io thread 即 mi->slave_running有三種狀态
MYSQL_SLAVE_NOT_RUN 0
MYSQL_SLAVE_RUN_NOT_CONNECT 1
MYSQL_SLAVE_RUN_CONNECT 2
sql thread 即 mi->slave_running 隻有 0 1兩種狀态
*/
volatile uint slave_running;
/* 用于判斷slave thread 是否啟動的變量,每次啟動時遞增1 */
volatile ulong slave_run_id;
/* repository's handler */
Rpl_info_handler *handler;
/*
唯一辨別一條 info 記錄的 id,辨別一行或一個檔案
Master_info 和 Relay_log_info 可以通過 channel 辨別唯一
Worker_info 需要通過 {id, channel} 辨別唯一
*/
uint internal_id;
/* 通道名,多源複制使用 */
char channel[CHANNEL_NAME_LENGTH+1];
/* 實作了兩個虛函數,讀寫 Rpl_info */
virtual bool read_info(Rpl_info_handler *from)= 0;
virtual bool write_info(Rpl_info_handler *to)= 0;
}
/*
Master_info 使用者 IO thread
主要儲存以下資訊:
連接配接到Master的使用者資訊
目前 master log name
目前 master log offset
一些其他控制變量
Master_info 讀取 master.info repository 初始化,通常是表或檔案
通過函數 mi_init_info() 進行初始化
調用 flush_info() 可以将 master.info 寫入磁盤,每次從master讀取資料都需要刷盤
*/
class Master_info : public Rpl_info
{
/* 前面儲存了 user、host 等連接配接資訊 */
/* 下面是連接配接資訊的 set/get 函數 */
/* 和 master 的連接配接 */
MYSQL* mysql;
/* 對應的 Relay_log_info */
Relay_log_info *rli;
/* IO 線程複制延遲 */
long clock_diff_with_master;
/* 心跳間隔 */
float heartbeat_period; // interface with CHANGE MASTER or master.info
/* 收到心跳次數 */
ulonglong received_heartbeats; // counter of received heartbeat events
/* 上次心跳時間 */
time_t last_heartbeat;
/* 上次收到event的時間 */
time_t last_recv_event_timestamp;
/* 忽略複制的server_id */
Server_ids *ignore_server_ids;
/* master server_id */
ulong master_id;
/* FORMAT_DESCRIPTION_EVENT前的checksum 算法 */
binary_log::enum_binlog_checksum_alg checksum_alg_before_fd;
/* 初始化 Master_info, 裡面調用read_info() 從 Rpl_info_handler 中讀取資訊 */
int mi_init_info();
/* 清理 Master_info,裡面會調用 Rpl_info_handler 的 eng_info() */
void end_info();
/* Master_info 資訊落盤,每次從主庫讀取資料後都會執行 */
int flush_info(bool force= FALSE);
/*
從master收到的 Format_description_log_event 寫在 relay log 末尾
IO thread 開始時建立,IO thread 結束時銷毀
IO thread 收到一個 Format_description_log_event 時更新
每次rotate時,IO thread 寫Format_description_log_event到新relay log
每次執行FLUSH LOGS,client 寫Format_description_log_event到新relay log
*/
Format_description_log_event *mi_description_event;
/* 最近一個GTID,可能是未完成事務的GTID,用于事務結束時寫入 Retrieved_Gtid_Set */
Gtid last_gtid_queued;
/* 用于判斷事務邊界 */
Transaction_boundary_parser transaction_parser;
/*
channel lock,以下操作需要持有寫鎖
START SLAVE;
STOP SLAVE;
CHANGE MASTER;
RESET SLAVE;
end_slave();
*/
Checkable_rwlock *m_channel_lock;
/* channel 被引用的次數,隻有為0時可以删除channel */
Atomic_int32 references;
}
/*
主要儲存以下資訊:
目前relay log
目前relay log offset
master log name
與上次更新對應的主庫日志序列
sql thread 其他資訊
初始化過程和 Master_info 類似
以下情況下 relay.info table/file 需要更新:
1. relay log file rotated
2. SQL thread stopped
3. while processing a Xid_log_event
4. after a Query_log_event(commit or rollback)
5. after processing any statement written to the binary log without a transaction context.
并行複制相關代碼留作以後分析,本次暫不涉及
*/
class Relay_log_info : public Rpl_info
{
/* 備份狀态标志位,用于标志是否在語句中 */
enum enum_state_flag {
/* 在語句中 */
IN_STMT,
/** Flag counter. Should always be last */
STATE_FLAGS_COUNT
};
/* 是否複制相同server_id的event,一般是false */
bool replicate_same_server_id;
/* 正在執行或最後一個執行的GTID,用來填充 performance_schema.replication_applier_status_by_worke 的 last_seen_transaction 列
*/
Gtid_specification currently_executing_gtid;
/* 讀取下面變量時,必須受data_lock保護 */
/* 目前relay log的檔案描述符 */
File cur_log_fd;
/* reay_log 對象,MYSQL_BIN_LOG類留作以後分析*/
MYSQL_BIN_LOG relay_log;
/* 主要用于查詢log_pos */
LOG_INFO linfo;
/*
cur_log 指向 relay_log.get_log_file() 或者 cache_buf
取決于relay_log_file是熱日志,還是需要打開冷日志
cache_buf 在打開冷日志時适用
*/
IO_CACHE cache_buf,*cur_log;
/* 辨別是否正在recovery */
bool is_relay_log_recovery;
/* 下面的變量可以不加鎖讀 */
/*
restart slave 時需要通路臨時表。
這個變量值在 init/end 時修改
SQL thread 隻讀
*/
TABLE *save_temporary_tables;
/* 對應的 Master_info */
Master_info *mi;
/* 打開臨時表的數量 */
Atomic_int32 channel_open_temp_tables;
/* 儲存 relay_log.get_open_count() */
uint32 cur_log_old_open_count;
/* init_info() 曾經失敗,RESET SLAVE 可以修複錯誤 */
bool error_on_rli_init_info;
/* 這裡跳過一些 Group replication 相關變量 */
/* received gtid set */
Gtid_set gtid_set;
/* 辨別此對象是否屬于SQL線程(屬于SQL線程為0) */
bool rli_fake;
/* 辨別 retrieved GTID set 是否已被初始化 */
bool gtid_retrieved_initialized;
/* 上一個錯誤的GTID */
Gtid last_sql_error_gtid;
/* 日志空間限制,日志空間總量(用于sys_var:relay_log_space,控制relay log空間) */
ulonglong log_space_limit,log_space_total;
/* 是否忽略日志空間限制 */
bool ignore_log_space_limit;
/* 需要清理空間時,SQL線程訓示IO線程rotate logs */
bool sql_force_rotate_relay;
/* 上次主庫記錄binlog的時間 */
time_t last_master_timestamp;
/* 上次執行event的時間 */
time_t last_exec_event_timestamp;
/* 跳過error event */
volatile uint32 slave_skip_counter;
/* 标記是否需要中斷pos_wait,change master 和 reset slave時需要中斷 */
volatile ulong abort_pos_wait;
/* log_space 相關信号量*/
mysql_mutex_t log_space_lock;
mysql_cond_t log_space_cond;
/* 這裡有一些 START SLAVE UNTIL 相關變量 */
/* 重試事務次數(trans_retries是重試次數上限),重試事務計數(retried_trans記錄重試了多少次) */
ulong trans_retries, retried_trans;
/*
延遲複制時間
CHANGE MASTER TO MASTER_DELAY=X.
由data_lock保護, SQL thread 讀取
SQL thread 運作時該變量不可寫
*/
time_t sql_delay;
/* sql_delay 結束時間 */
time_t sql_delay_end;
/* enum_state_flag 的标志位 */
uint32 m_flags;
}
函數分析
mysql_execute_command() 當做入口開始分析,可以看出change_master需要SUPER權限
case SQLCOM_CHANGE_MASTER:
{
if (check_global_access(thd, SUPER_ACL))
goto error;
res= change_master_cmd(thd);
break;
}
/*
函數具備以下功能
更改接收/執行日志的配置/位點
purge relay log
删除 worker info(并行複制使用)
*/
/* 分析 change_master 函數會略過一部分邏輯 */
int change_master(THD* thd, Master_info* mi, LEX_MASTER_INFO* lex_mi,
bool preserve_logs)
{
/*
如果SQL thread 和 IO thread已經停止,并且沒有指定 relay_log_pos和relay_log_file
會purge relay log
*/
bool need_relay_log_purge= 1;
/* 為了修改 mysql.slave_master_info,需要無視read_only和super_read_only */
thd->set_skip_readonly_check();
/* channel 加讀寫鎖,即将對channel做修改,函數結束時才會釋放鎖 */
mi->channel_wrlock();
/*
對 mi->run_lock 和 rli->run_lock 加鎖
防止線程運作狀态發生變化
*/
lock_slave_threads(mi);
/* 設定thread_mask,用來辨別 IO/SQL thread 的運作狀态 */
init_thread_mask(&thread_mask, mi, 0);
/* 設定auto_position=1需要IO/SQL thread 都不在運作狀态,否則報錯退出 */
if (thread_mask)
{
if (lex_mi->auto_position != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
{
error= ER_SLAVE_CHANNEL_MUST_STOP;
my_error(ER_SLAVE_CHANNEL_MUST_STOP, MYF(0), mi->get_channel());
goto err;
}
/* 如果 SQL thread 和 IO thread 沒有全部停止,不能purge relay log */
need_relay_log_purge= 0;
}
/*
下面是一些錯誤判斷,都是很明顯的錯誤
1. 如果設定了auto_position,同時又指定了複制位點,如 relay_log_pos,報錯退出
2. auto_position 需要 GTID_MODE != OFF
3. IO thread 運作時不能改變 IO thread 相關配置
4. SQL thread 運作時不能改變 SQL thread 相關配置
5. 如果指定了master_host,那麼master_host不能是空串
*/
/* 記錄目前狀态 */
THD_STAGE_INFO(thd, stage_changing_master);
/* 辨別停止的線程,給load_mi_and_rli_from_repositories()使用 */
init_thread_mask(&thread_mask_stopped_threads, mi, 1);
/*
從倉庫加載 mi 和 rli 的配置
隻有停止狀态的線程可以加載配置(SQL thread 對應 rli,IO thread 對應 mi)
*/
if (load_mi_and_rli_from_repositories(mi, false, thread_mask_stopped_threads))
{
error= ER_MASTER_INFO;
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
goto err;
}
/*
修改mi相關配置,并儲存老配置
save_ 變量中儲存的老配置用于列印日志
*/
if (have_receive_option)
{
strmake(saved_host, mi->host, HOSTNAME_LENGTH);
strmake(saved_bind_addr, mi->bind_addr, HOSTNAME_LENGTH);
saved_port= mi->port;
strmake(saved_log_name, mi->get_master_log_name(), FN_REFLEN - 1);
saved_log_pos= mi->get_master_log_pos();
if ((error= change_receive_options(thd, lex_mi, mi)))
{
goto err;
}
}
/* 列印日志,change master 的源值和目标值 */
if (have_receive_option)
sql_print_information("'CHANGE MASTER TO%s executed'. "
"Previous state master_host='%s', master_port= %u, master_log_file='%s', "
"master_log_pos= %ld, master_bind='%s'. "
"New state master_host='%s', master_port= %u, master_log_file='%s', "
"master_log_pos= %ld, master_bind='%s'.",
mi->get_for_channel_str(true),
saved_host, saved_port, saved_log_name, (ulong) saved_log_pos,
saved_bind_addr, mi->host, mi->port, mi->get_master_log_name(),
(ulong) mi->get_master_log_pos(), mi->bind_addr);
/* 修改rli相關配置 */
if (have_execute_option)
change_execute_options(lex_mi, mi);
/* 持久化master_info */
if ((thread_mask & SLAVE_IO) == 0 && flush_master_info(mi, true))
{
error= ER_RELAY_LOG_INIT;
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
goto err;
}
if ((thread_mask & SLAVE_SQL) == 0)
{
/* 記錄全局變量 relay_log_purge */
bool save_relay_log_purge= relay_log_purge;
if (need_relay_log_purge)
{
const char* errmsg= 0;
/* purge relay logs */
relay_log_purge= 1;
THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
if (mi->rli->purge_relay_logs(thd,
0 /* not only reset, but also reinit */,
&errmsg))
{
error= ER_RELAY_LOG_FAIL;
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
goto err;
}
}
else
{
const char* msg;
relay_log_purge= 0;
DBUG_ASSERT(mi->rli->inited);
/*初始化 relay_log_pos */
if (mi->rli->init_relay_log_pos(mi->rli->get_group_relay_log_name(),
mi->rli->get_group_relay_log_pos(),
true/*we do need mi->rli->data_lock*/,
&msg, 0))
{
error= ER_RELAY_LOG_INIT;
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
goto err;
}
}
/* 恢複全局變量 relay_log_purge 的值 */
relay_log_purge= save_relay_log_purge;
/* 清理until condition */
mi->rli->clear_until_condition();
/* relay_log_info 持久化到磁盤 */
if (mi->rli->flush_info(true))
{
error= ER_RELAY_LOG_INIT;
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush relay info file.");
goto err;
}
}
/* 出錯後跳到次數,釋放之前申請的鎖 */
err:
unlock_slave_threads(mi);
mi->channel_unlock();
DBUG_RETURN(error);
}
總結
change master 主要功能是修改 SQL 和 IO 線程的配置資訊,執行時可能會purge relay log
沒有特殊情況,建議指定auto_position=1,不要自己指定複制位點,避免資料丢失風險
如需對change master 做修改,需要注意在鎖保護下修改變量,同時注意加鎖順序,避免死鎖