一、調用流程大概如下
handle_slave_sql
->是否開啟了slave_preserve_commit_order和log_slave_updates參數,開啟的話需要設定送出順序管理器
if (opt_slave_preserve_commit_order && rli->opt_slave_parallel_workers > 0 &&
opt_bin_log && opt_log_slave_updates)
commit_order_mngr= new Commit_order_manager(rli->opt_slave_parallel_workers); //order commit 管理器
rli->set_commit_order_manager(commit_order_mngr);
->如果是MTS則需要啟動worker線程
if (slave_start_workers(rli, rli->opt_slave_parallel_workers, &mts_inited) != 0)//啟動worker線程
{
mysql_cond_broadcast(&rli->start_cond);
mysql_mutex_unlock(&rli->run_lock);
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
"Failed during slave workers initialization");
goto err;
->檢查rep table是否是事務類型的如果不是則報警告
if (!rli->is_transactional()) //是否是 table或者是file類型是table類型則支援事物
rli->report(WARNING_LEVEL, 0,
"If a crash happens this configuration does not guarantee that the relay "
"log info will be consistent");
-> 初始化 relay log 的通路位置
if (rli->init_relay_log_pos(rli->get_group_relay_log_name(),
rli->get_group_relay_log_pos(),
true/*need_data_lock=true*/, &errmsg,
1 /*look for a description_event*/)) //初始化 relay log 的通路位置
這個位置比較關鍵也就是從哪裡開始讀取我們的relay log。如果出現錯誤将會導緻讀取的relay log錯誤。
是以我們需要保證rep info的安全,如果設定了recover relay log 那麼将會初始化為最新一個relay log的
開始位置,因為所有的未執行的binlog event将會從新拉取,老的relay log 已經不重要了。後面再說。
-> GTID event沒有辦法使用sql_slave_skip_counter 其具體含義參考:
Log_event::do_shall_skip
mysql> set global sql_slave_skip_counter=1;
ERROR 1858 (HY000): sql_slave_skip_counter can not be set when the server is running with
@@GLOBAL.GTID_MODE = ON. Instead, for each transaction that you want to skip, generate an
empty transaction with the same GTID as the transaction
進入循環 知道SQL線程被殺死
-> 進入狀态stage_reading_event_from_the_relay_log
-> 進行一段skip event的判斷和日志輸出
GTID event沒有辦法使用sql_slave_skip_counter 其具體含義參考:
Log_event::do_shall_skip
mysql> set global sql_slave_skip_counter=1;
ERROR 1858 (HY000): sql_slave_skip_counter can not be set when the server is running with
@@GLOBAL.GTID_MODE = ON. Instead, for each transaction that you want to skip, generate an
empty transaction with the same GTID as the transaction
-> exec_relay_log_event 讀取應用 一個event的上層接口
->next_event 讀取下一個Event 完成MTS的檢查點
->擷取開始位置 rli->set_event_start_pos(my_b_tell(cur_log));
->Log_event::read_log_event
->如果是MTS 是否需要進行檢查點
1、是否超過檢查點周期
周期檢查在函數mts_checkpoint_routine内部
set_timespec_nsec(&curr_clock, 0);
ulonglong diff= diff_timespec(&curr_clock, &rli->last_clock);
if (!force && diff < period)
{
/*
We do not need to execute the checkpoint now because
the time elapsed is not enough.
*/
DBUG_RETURN(FALSE);
}
2、是否已經GAQ已經滿了
bool force= (rli->checkpoint_seqno > (rli->checkpoint_group - 1)); //如果達到了 GAQ的大小 設定為force 強制checkpoint
->是否relay log 大小已經達到最大 是否需要relay log切換
但是需要注意如果本事物沒有結束不能進行切換
/*
If we have reached the limit of the relay space and we 如果我們達到 relay_log_space_limit 上限 需要通知IO THREAD進行切換 清理空間```
are going to sleep, waiting for more events:
1. If outside a group, SQL thread asks the IO thread
to force a rotation so that the SQL thread purges
logs next time it processes an event (thus space is
freed).
2. If in a group, SQL thread asks the IO thread to
ignore the limit and queues yet one more event
so that the SQL thread finishes the group and
is are able to rotate and purge sometime soon.
*/
if (rli->log_space_limit &&
rli->log_space_limit < rli->log_space_total)
{
/* force rotation if not in an unfinished group */
if (!rli->is_parallel_exec())
{
rli->sql_force_rotate_relay= !rli->is_in_group(); //如果不是一組就需要切換
}
else
{
rli->sql_force_rotate_relay=
(rli->mts_group_status != Relay_log_info::MTS_IN_GROUP);
}
/* ask for one more event */
rli->ignore_log_space_limit= true;//是一組 不能切換
}
->如果讀取了目前relay log的全部的relay log event,
->如果是目前relay log
->空閑狀态下等待io 線程的喚醒,如果是MTS還需要定期醒來進行檢查點,如下:
if (rli->is_parallel_exec() && (opt_mts_checkpoint_period != 0 ||
DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)))
{
int ret= 0;
struct timespec waittime;
ulonglong period= static_cast<ulonglong>(opt_mts_checkpoint_period * 1000000ULL);
ulong signal_cnt= rli->relay_log.signal_cnt;
mysql_mutex_unlock(log_lock);
do
{
/*
At this point the coordinator has no job to delegate to workers.
However, workers are executing their assigned jobs and as such
the checkpoint routine must be periodically invoked.
*/
(void) mts_checkpoint_routine(rli, period, false, true/*need_data_lock=true*/); // TODO: ALFRANIO ERROR
mysql_mutex_lock(log_lock);
if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))
period= 10000000ULL;
set_timespec_nsec(&waittime, period);
ret= rli->relay_log.wait_for_update_relay_log(thd, &waittime);
} while ((ret == ETIMEDOUT || ret == ETIME) /* todo:remove */ &&
signal_cnt == rli->relay_log.signal_cnt && !thd->killed);
}
else
{
rli->relay_log.wait_for_update_relay_log(thd, NULL); //等待relay log 更改的信号 SQL THREAD 會等待在這裡
}
-> 如果不是目前relay log 那麼 SQL線程應用或者分發完成完成後就可以清理了
并且參數relay_log_purge需要設定為1
if (rli->relay_log.purge_first_log
(rli,
rli->get_group_relay_log_pos() == rli->get_event_relay_log_pos()
&& !strcmp(rli->get_group_relay_log_name(),rli->get_event_relay_log_name())))//做relay log的清理
-> 如果是單SQL現成 擷取event的時間
這一步 就是擷取計算延遲的重要因素,但是注意MTS不是在這裡實在檢查點裡面
last_master_timestamp
rli->last_master_timestamp= ev->common_header->when.tv_sec + //event header 的timestamp
(time_t) ev->exec_time; //擷取event的 timestamp作為 計算last_master_timestamp的基礎資料 query event才有的執行時間
DBUG_ASSERT(rli->last_master_timestamp >= 0); //但是對于MTS來講應該注意是最後一個XID EVENT的 時間不是這裡設定的 在mts_checkpoint_routine裡面
-> 如果GITD_MODE 且AUTO_POSITION 且是MTS需要由協調線程進行半事物的恢複 (partial transaction)
構造復原EVENT進行恢複,而對已非MTS會在gtid event做復原。
這種情況可能出現在:
- AUTO_POSITION情況下如果重連,會重新發送已經傳輸的Event。
- AUTO_POSITION情況下如果從庫異常當機重新開機,并且recovery_relay_log=0的情況下,會重新發送已經傳輸的Event,并且relay log pos不會重置
是以我們前面在IO線程和DUMP線程中已經讨論了,每次sql線程的啟動都會通過GTID去重新尋找需要拉取的
位置。
coord_handle_partial_binlogged_transaction(rli, ev)
-> apply_event_and_update_pos 非MTS 完成 應用 MTS完成分發
-> 進行skip event操作
-> 維護skip counter計數器
if (reason == Log_event::EVENT_SKIP_COUNT)
{
--rli->slave_skip_counter;//維護skip count
skip_event= TRUE;
}
我們看到slave_skip_counter是以event為機關的,但是對于最後一個event如果跨事務了
那麼整個事物都需要跳過。但是skip在GTID模式下是不能用的。
-> 如果不能跳過的事務 就需要應用了。MTS則完成分發
->完成延遲應用邏輯
sql_delay_event(ev, thd, rli)
->ev->apply_event(rli); 這裡單SQL線程應用 MTS完成分發,分發方式參考前面
->是否是進行 MTS recovery if (rli->is_mts_recovery())
根據 bitmap 設定進行跳過處理
if (rli->is_mts_recovery())//如果是恢複 這個地方就是前面恢複掃描出來的位置
{
bool skip=
bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
(get_mts_execution_mode(::server_id,
rli->mts_group_status ==
Relay_log_info::MTS_IN_GROUP,
rli->current_mts_submode->get_type() ==
MTS_PARALLEL_TYPE_DB_NAME)
== EVENT_EXEC_PARALLEL);
if (skip)
{
DBUG_RETURN(0);
}
else
{
DBUG_RETURN(do_apply_event(rli));
}
}