天天看點

FATE學習:跟着日志讀源碼(八)upload任務task finsih階段綜述執行細節

綜述

task結束時,是通過TaskExecutor.report_task_update_to_driver更新本地的DB中task的狀态資訊的。但由于是異步請求,對于發起者而言,并不會收到task 結束的資訊,隻有在輪詢中,去查詢db,擷取task的狀态。

擷取task已經finish的資訊後,會返還資源和進行相關環境清理。

執行細節

FATE學習:跟着日志讀源碼(八)upload任務task finsih階段綜述執行細節
  1. dag_scheduler.py:schedule_running_job 排程輪詢,和前文類似,這裡不多贅述調用鍊為

    DagScheduler.schedule_running_job() ->TaskScheduler.schedule->JobSaver.get_tasks_asc -> JobSaver.collect_task_of_all_party -> JobSaver.federated_task_status

    對應的${job_log_dir}/fate_flow_schedule.log 日志為

[INFO] [2021-07-26 08:20:44,944] [1:140259369826048] - dag_scheduler.py[line:298]: scheduling job 202107260820309976351
[INFO] [2021-07-26 08:20:44,945] [1:140259369826048] - task_scheduler.py[line:28]: scheduling job 202107260820309976351 tasks
[INFO] [2021-07-26 08:20:45,010] [1:140259369826048] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'elapsed': 11447, 'end_time': 1627287644838, 'job_id': '202107260820309976351', 'party_id': '0', 'party_status': 'success', 'role': 'local', 'start_time': 1627287632259, 'status': 'running', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'update_time': 1627287631078}
[INFO] [2021-07-26 08:20:45,022] [1:140259369826048] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:45,036] [1:140259369826048] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
[INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - task_scheduler.py[line:143]: job 202107260820309976351 task 202107260820309976351_upload_0 0 status is success, calculate by task party status list: ['success']
           
  1. task_scheduler.py:同步狀态,同上文相同,調用FederatedScheduler.sync_task_status

    産生的調用鍊為

    FederatedScheduler.sync_task_status() -> api_utils.federated_coordination_on_http() -> fate_flow_server通過flask -> party_app.task_status() -> TaskController.update_task_status

對應的${job_log_dir}/fate_flow_schedule.log 日志為

[INFO] [2021-07-26 08:20:45,041] [1:140259369826048] - federated_scheduler.py[line:192]: job 202107260820309976351 task 202107260820309976351_upload_0 0 is success, sync to all party
[INFO] [2021-07-26 08:20:45,053] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:45,063] [1:140259119585024] - job_saver.py[line:74]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status successfully: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': '0', 'role': 'local', 'party_id': '0', 'status': 'success'}
           
  1. resource_manager.py:歸還資源,執行return_task_resource 這一部分會更新resource 相關的DB,增加剩餘資源數。

    對應的${job_log_dir}/fate_flow_schedule.log 日志為

[INFO] [2021-07-26 08:20:45,070] [1:140259119585024] - resource_manager.py[line:285]: task 202107260820309976351_upload_0 0 return resource successfully
           
  1. task_controller.py:執行clean_task,調用 job_tracker.clean_task() job_tracker.clean_task 清理tracker 表中的相關内容

    對應的${job_log_dir}/fate_flow_schedule.log 日志為

[INFO] [2021-07-26 08:20:45,075] [1:140259119585024] - job_tracker.py[line:424]: clean task 202107260820309976351_upload_0 0 on local 0
[INFO] [2021-07-26 08:20:46,706] [1:140259119585024] - job_tracker.py[line:440]: clean table by namespace 202107260820309976351_upload_0_0_local_0 on local 0 done
[INFO] [2021-07-26 08:20:46,725] [1:140259119585024] - job_tracker.py[line:446]: clean table by namespace 202107260820309976351_upload_0_0 on local 0 done
           
  1. federated_scheduler.py: 傳回執行結果,對應2

    對應的${job_log_dir}/fate_flow_schedule.log 日志為

[INFO] [2021-07-26 08:20:47,714] [1:140259369826048] - federated_scheduler.py[line:195]: sync job 202107260820309976351 task 202107260820309976351_upload_0 0 status success to all party success
           
  1. task_scheduler.py:因task 的狀态已經屬于EndStatus,執行FederatedScheduler.stop_task 根據pid 進行kill。

    調用鍊為FederatedScheduler.stop_task()-> api_utils.federated_coordination_on_http() -> fate_flow_server通過flask -> party_app.stop_task() -> TaskController.stop_task() -> TaskController.kill_task() ->job_utils.kill_task_executor_process

    P1: kill pid

    P2: stop session

對應的${job_log_dir}/fate_flow_schedule.log 日志為

[INFO] [2021-07-26 08:20:47,714] [1:140259369826048] - federated_scheduler.py[line:202]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 0
[INFO] [2021-07-26 08:20:47,739] [1:140259119585024] - job_utils.py[line:396]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,740] [1:140259119585024] - job_utils.py[line:399]: can not found job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,742] [1:140259119585024] - job_utils.py[line:423]: start run subprocess to stop task session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:47,743] [1:140259119585024] - job_utils.py[line:310]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop
[INFO] [2021-07-26 08:20:47,755] [1:140259119585024] - job_utils.py[line:333]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop successfully, pid is 148
[INFO] [2021-07-26 08:20:47,756] [1:140259119585024] - task_controller.py[line:254]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 process 90 kill success
           
  1. task_controller.py:更新資訊

    cls.update_task_status(task_info=task_info)

    cls.update_task(task_info=task_info)

[INFO] [2021-07-26 08:20:47,756] [1:140259119585024] - job_saver.py[line:71]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0 status
[INFO] [2021-07-26 08:20:47,762] [1:140259119585024] - job_saver.py[line:76]: update job 202107260820309976351 task 202107260820309976351_upload_0 0 status update does not take effect: {'job_id': '202107260820309976351', 'task_id': '202107260820309976351_upload_0', 'task_version': 0, 'role': 'local', 'party_id': '0', 'party_status': 'success'}
[INFO] [2021-07-26 08:20:47,767] [1:140259119585024] - job_saver.py[line:81]: try to update job 202107260820309976351 task 202107260820309976351_upload_0 0
[WARNING] [2021-07-26 08:20:47,771] [1:140259119585024] - job_saver.py[line:86]: job 202107260820309976351 task 202107260820309976351_upload_0 0 update does not take effect
           
  1. federated_scheduler.py: 對應6
[INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - federated_scheduler.py[line:206]: stop job 202107260820309976351 task 202107260820309976351_upload_0 0 success
           
  1. task_scheduler.py:對應1
[INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - task_scheduler.py[line:75]: finish scheduling job 202107260820309976351 tasks
           

task finish 結束

繼續閱讀