一、分發調用流程
->ev->apply_event(rli); Log_event::apply_event 這裡如果是非MTS進行應用 如果MTS 如果是GTID event 進行WORKER線程的配置設定 ,如果不是則擷取WORKER線程
-> 是否是進行 MTS recovery if (rli->is_mts_recovery())
根據 bitmap 設定進行跳過處理
if (rli->is_mts_recovery())//如果是恢複 這個地方就是前面恢複掃描出來的位置
{
bool skip=
bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
(get_mts_execution_mode(::server_id,
rli->mts_group_status ==
Relay_log_info::MTS_IN_GROUP,
rli->current_mts_submode->get_type() ==
MTS_PARALLEL_TYPE_DB_NAME)
== EVENT_EXEC_PARALLEL);
if (skip)
{
DBUG_RETURN(0);
}
else
{
DBUG_RETURN(do_apply_event(rli));
}
}
-> 如果是單線程直接調用 do_apply_event
-> 如果是多線程MTS !!!!!!!!!!!!!!!!
->Log_event::get_slave_worker 主要是根據不同的EVENT進行不同的操作 包含1、判定是否可以并發 2、判定由哪一個worker進行執行
->如果是GTID event !!!匿名GTID Event也可以
->is_gtid_event
->初始化一個組 Slave_job_group
->在GAQ中配置設定隊列序号
->rli->mts_groups_assigned++ 增加
->使用GTID-event的位置和mts_groups_assigned将GROUP中的master_log_pos位置 和 total_seqno初始化
->将GROUP加到 GAQ并且配置設定的 序号 gaq->assigned_group_index= gaq->en_queue(&group);
->初始化一個Slave_job_item
->加入到rli->curr_group_da.push_back中
->進行GTID 模式下 判定是否可以并發
->schedule_next_event
->Mts_submode_logical_clock::schedule_next_event 基于COMMIT_ORDER和WRITE SET的都使用這個方法
主要判斷是否可以進行并發并且進行等待
->擷取GTID EVENT中的last commit和seq number
->如果不能進行并發則需要等待last commit > LWM SEQ NUMBER(最新一次除沒有送出事物之前的一個事物的seq number)
->wait_for_last_committed_trx 進入等待 他會設定一個min_waited_timestamp 作為
其他事物送出時更新LWM SEQ NUMBER的标記,等待直到last commit<=LWM SEQ NUMBER
等待标記為
stage_worker_waiting_for_commit_parent
Waiting for dependent transaction to commit
同時還會更新 mts_total_wait_overlap
my_atomic_add64(&rli->mts_total_wait_overlap, diff_timespec(&ts[1], &ts[0]));
擷取 LWM SEQ NUMBER 的源碼注釋:
the last time index containg lwm
+------+
| LWM |
| | |
V V V
GAQ:x xoooooxxxxxXXXXX...X
^ ^
| | LWM+1
|
+- tne new current_lwm
<---- logical (commit) time ----
here `x' stands for committed, `X' for committed and discarded from
the running range of the queue, `o' for not committed.
->如果 query event !!!
->初始化一個Slave_job_item
->将其加入到rli->curr_group_da.push_back(job_item);中
->設定 rli->curr_group_seen_begin= true; 說明找到了query event
->進行DATABASE模式的配置設定 不考慮
->如果是MAP EVENT
->開始擷取WORKER線程到這裡已經可以并發執行了,需要進行WORKER線程的擷取
ret_worker=rli->current_mts_submode->get_least_occupied_worker(rli, &rli->workers,this);
Mts_submode_logical_clock::get_least_occupied_worker
-> 第一次rli->last_assigned_worker為空 這需要新配置設定
-> Mts_submode_logical_clock::get_free_worker 進行配置設定
->循環每一個worker線程,看是否有正在等待處理的event,找到一個沒有任何工作的worker線程
這裡也能出是輪詢每一個worker線程找到空閑的worker線程就可以了。判斷标準就是
if (w_i->jobs.len == 0)
-> 如果沒有找到,配置設定失敗,進行等待等待為
stage_slave_waiting_for_workers_to_process_queue
Waiting for slave workers to process their queues
-> 循環擷取work線程,直到成功
-> 擷取成功後更新資訊
等待的時間:rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);
增加一次等待次數:rli->mts_wq_no_underrun_cnt++;
->如果開啟了參數 slave_preserve_commit_order=1 注冊事物
rli->get_commit_order_manager()->register_trx(worker);
->ptr_group->worker_id= ret_worker->id;//設定本次事物組的worker_id 就是配置設定的工作線程
->伴随着Woker線程的配置設定,如果是開啟了參數slave_preserve_commit_order需要注冊這個事務
if (rli->get_commit_order_manager() != NULL && worker != NULL)
rli->get_commit_order_manager()->register_trx(worker);//注冊事物
->如果是DEL event
步驟同上 隻是不需要配置設定work線程了因為已經配置設定了
->如果是XID event
步驟同上 不過還需要更新group 的checkpoint資訊 如下:
if (!ret_worker->checkpoint_notified) //将GROUP中填寫 checkpoint資訊
{
if (!ptr_group)
ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
ptr_group->checkpoint_log_name=
my_strdup(key_memory_log_event, rli->get_group_master_log_name(), MYF(MY_WME));
ptr_group->checkpoint_log_pos= rli->get_group_master_log_pos();
ptr_group->checkpoint_relay_log_name=
my_strdup(key_memory_log_event, rli->get_group_relay_log_name(), MYF(MY_WME));
ptr_group->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
ptr_group->shifted= ret_worker->bitmap_shifted; //checkpoint 後 移動的個數 用于後面送出的時候改變參考Slave_worker::commit_positions 設定參考mts_checkpoint_routine()
ret_worker->bitmap_shifted= 0;//重置移動量
ret_worker->checkpoint_notified= TRUE;
}
ptr_group->checkpoint_seqno= rli->checkpoint_seqno; //擷取seqno 這個值會在chkpt後減去偏移量
ptr_group->ts= common_header->when.tv_sec + (time_t) exec_time; // Seconds_behind_master related //checkpoint的時候會将這個值再次傳遞 mts_checkpoint_routine()
rli->checkpoint_seqno++;//增加seqno
到這裡 Log_event::get_slave_worker 每個event的處理流程完成,每次都會回到
Log_event::apply_event
->Log_event::apply_event 傳回到 apply_event_and_update_pos
->回到apply_event_and_update_pos 下面邏輯MTS才進行 也就是入隊到woker中去
開始進入worker 隊列,GTID和QUERY EVNET會跟随 MAP EVENT一起進入隊列加入了li->curr_group_da中
初始化map event的Slave_job_item
設定ev屬于在GAP中的位置 ev->mts_group_idx= rli->gaq->assigned_group_index;
如果是map event的話還會幫助GTID和QUERY event入隊
然後自己入隊(append_item_to_jobs(job_item, w, rli))
其他event 比如delete event和xid event則自己調用(append_item_to_jobs(job_item, w, rli))
入隊
-> append_item_to_jobs(job_item, w, rli)
->如果入隊的event 因為worker線程的隊列已經滿了則等待:
進入狀态stage_slave_waiting_worker_queue
Waiting for Slave Worker queue
wroker隊列的大小為:mts_slave_worker_queue_len_max= 16384;
每次等待增加一次
worker->jobs.overfill= TRUE;
worker->jobs.waited_overfill++;
rli->mts_wq_overfill_cnt++;
(rli->is_parallel_exec() && rli->mts_events_assigned % 1024 == 1)
如果每個event的前面的操作操作120秒 則會出現通知 這個警告經常遇到:
從上面我們看到的等待來講超過120秒的可能有3種
1、由于上一組并發有大事物沒有送出
導緻不能并發worker線程的等待時間
2、worker線程都在完成工作及在應用上一個事物的event,沒有新的worker線程以供新配置設定
3、worker線程已經配置設定,但是由于worker線程的配置設定隊列為16384,如果應用比較慢則可能入不了
配置設定隊列,一般也是大事物造成的。
sql_print_information("Multi-threaded slave statistics%s: "
"seconds elapsed = %lu; "
"events assigned = %llu; "
"worker queues filled over overrun level = %lu; "
"waited due a Worker queue full = %lu; "
"waited due the total size = %lu; "
"waited at clock conflicts = %llu "
"waited (count) when Workers occupied = %lu "
"waited when Workers occupied = %llu",
rli->get_for_channel_str(),
static_cast<unsigned long>
(my_now - rli->mts_last_online_stat),//消耗總時間 機關秒
rli->mts_events_assigned,//總的event配置設定的個數
rli->mts_wq_overrun_cnt,// worker線程配置設定隊列大于 90%的次數 目前寫死 14746
rli->mts_wq_overfill_cnt, //由于work 配置設定隊列已滿造成的等待次數 目前寫死 16384
rli->wq_size_waits_cnt, //大Event的個數 一般不會存在
rli->mts_total_wait_overlap,//由于上一組并行有大事物沒有送出導緻不能配置設定worker線程的等待時間 機關納秒
rli->mts_wq_no_underrun_cnt, //work線程由于沒有空閑的而等待的次數
rli->mts_total_wait_worker_avail);//work線程由于沒有空閑的而等待的時間 機關納秒
->回到apply_event_and_update_pos 下面 進行pos的更新 這個pos是 event_relay_log_pos ,不會出現在show slave或者其他地方
更新内部變量讀取到的relay log位置和名字 這個不用于外部通路
uint event_relay_log_number; 這兩個是正在執行的relay log的位置
ulonglong event_relay_log_pos;