大家好,又見面了,我是你們的朋友全棧君。
繼續深挖 datax裡的container,任何一個任務進入datax都會判斷是jobContainer還是TaskGroupContainer。那後者要做哪些事情。
一、TaskGroupContainer概述
JobContainer将所有的task配置設定到TaskGroup中執行,TaskGroup啟動5個線程去消費所有的task
TaskGroupContainer裡的主入口為start方法,實作自AbstractContainer.start
TaskGroupContainer.start方法主要做了如下幾件事
1、初始化task執行相關的狀态資訊,分别是taskId->Congifuration的map、待運作的任務隊列taskQueue、運作失敗任務taskFailedExecutorMap、運作中的任務runTasks、任務開始時間taskStartTimeMap
2、循環檢測所有任務的執行狀态
1)判斷是否有失敗的task,如果有則放入失敗對立中,并檢視目前的執行是否支援重跑和failOver,如果支援則重新放回執行隊列中;如果沒有失敗,則标記任務執行成功,并從狀态輪詢map中移除
2)如果發現有失敗的任務,則彙報目前TaskGroup的狀态,并抛出異常
3)檢視目前執行隊列的長度,如果發現執行隊列還有通道,則建構TaskExecutor加入執行隊列,并從待運作移除
4)檢查執行隊列和所有的任務狀态,如果所有的任務都執行成功,則彙報taskGroup的狀态并從循環中退出
5)檢查目前時間是否超過彙報時間檢測,如果是,則彙報目前狀态
6)當所有的執行完成從while中退出之後,再次全局彙報目前的任務狀态
複制
二、主要方法
三、主入口start的時序圖
四、源碼解讀
package com.alibaba.datax.core.taskgroup;
/**
* JobContainer将所有的task配置設定到TaskGroup中執行,TaskGroup啟動5個線程去消費所有的task
*/
public class TaskGroupContainer extends AbstractContainer {
private static final Logger LOG = LoggerFactory.getLogger(TaskGroupContainer.class);
/**
* 目前taskGroup所屬jobId
*/
private long jobId;
/**
* 目前taskGroupId
*/
private int taskGroupId;
/**
* 使用的channel類
*/
private String channelClz;
/**
* task收集器使用的類
*/
private String taskCollectClz;
private TaskMonitor taskMonitor = TaskMonitor.getInstance();
public TaskGroupContainer(Configuration cfg) {
super(cfg);
initCommunicator(cfg);
this.jobId = configuration.getLong(DATAX_CORE_CONTAINER_JOB_ID);
this.taskGroupId = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_ID);
this.channelClz = configuration.getString(DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
this.taskCollectClz = configuration.getString(DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
}
/**
* 初始化容器之間的溝通者
*
* @param configuration
*/
private void initCommunicator(Configuration configuration) {
super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));
}
public long getJobId() {
return jobId;
}
public int getTaskGroupId() {
return taskGroupId;
}
/**
* 1、初始化task執行相關的狀态資訊,分别是taskId->Cfg的map、待運作的任務隊列taskQueue、運作失敗任務taskFailedExecutorMap、
* 運作中的任務runTasks、任務開始時間taskStartTimeMap
* 2、循環檢測所有任務的執行狀态
* 1)判斷是否有失敗的task,如果有則放入失敗對立中,并檢視目前的執行是否支援重跑和failOver,如果支援則重新放回執行隊列中;
* 如果沒有失敗,則标記任務執行成功,并從狀态輪詢map中移除
* 2)如果發現有失敗的任務,則彙報目前TaskGroup的狀态,并抛出異常
* 3)檢視目前執行隊列的長度,如果發現執行隊列還有通道,則建構TaskExecutor加入執行隊列,并從待運作移除
* 4)檢查執行隊列和所有的任務狀态,如果所有的任務都執行成功,則彙報taskGroup的狀态并從循環中退出
* 5)檢查目前時間是否超過彙報時間檢測,如果是,則彙報目前狀态
* 6)當所有的執行完成從while中退出之後,再次全局彙報目前的任務狀态
*/
@Override
public void start() {
try {
//狀态check時間間隔,較短,可以把任務及時分發到對應channel中
int sleepMsec = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
//狀态彙報時間間隔,稍長,避免大量彙報
long reportMsec = configuration.getLong(DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL, 10000);
/**
* 2分鐘彙報一次性能統計
*/
// 擷取channel數目
int channelNum = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
int taskMaxRetrys = configuration.getInt(DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
long taskRetryMsec = configuration
.getLong(DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
long taskMaxWaitInMsec = configuration
.getLong(DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
List<Configuration> taskCfgs = configuration.getListConfiguration(DATAX_JOB_CONTENT);
if (LOG.isDebugEnabled()) {
LOG.debug("taskGroup[{}]'s task configs[{}]", taskGroupId, JSON.toJSONString(taskCfgs));
}
int taskCntInThisTaskGroup = taskCfgs.size();
LOG.info(String.format("taskGroupId=[%d] start [%d] channels for [%d] tasks.",
this.taskGroupId, channelNum, taskCntInThisTaskGroup));
this.containerCommunicator.registerCommunication(taskCfgs);
//taskId與task配置
Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskCfgs);
//待運作task清單
List<Configuration> taskQueue = buildRemainTasks(taskCfgs);
//taskId與上次失敗執行個體
Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<>();
//正在運作task
List<TaskExecutor> runTasks = new ArrayList<>(channelNum);
//任務開始時間
Map<Integer, Long> taskStartTimeMap = new HashMap<>();
long lastReportTimeStamp = 0;
Communication lastTaskGroupContainerComm = new Communication();
while (true) {
//1.判斷task狀态
boolean failedOrKilled = false;
//因為實作是TGContainerCommunicator,是以傳回是 Map: key=taskId, value=Communication
Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
for (Map.Entry<Integer, Communication> entry : communicationMap.entrySet()) {
Integer taskId = entry.getKey();
Communication taskCommunication = entry.getValue();
// 通訊類沒有結束,就繼續執行後面代碼
if (!taskCommunication.isFinished()) {
continue;
}
// 任務正在執行,是以從runTasks中根據taskId移除
TaskExecutor taskExecutor = removeTask(runTasks, taskId);
//上面從runTasks裡移除了,是以對應在monitor裡移除
taskMonitor.removeTask(taskId);
//失敗,看task是否支援failover,重試次數未超過最大限制
if (taskCommunication.getState() == State.FAILED) {
taskFailedExecutorMap.put(taskId, taskExecutor);
// 如果 任務支援失敗重試,并且重試次數小于 任務最大重試次數,則重新将任務加入到隊列
if (taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetrys) {
//關閉老的executor
taskExecutor.shutdown();
//将task的狀态重置
containerCommunicator.resetCommunication(taskId);
Configuration taskConfig = taskConfigMap.get(taskId);
//重新加入任務清單
taskQueue.add(taskConfig);
} else {
failedOrKilled = true;
break;
}
} else if (taskCommunication.getState() == State.KILLED) {
failedOrKilled = true;
break;
} else if (taskCommunication.getState() == State.SUCCEEDED) {
// 如果 task成功,将該資訊記錄到性能記錄類PerfRecord(友善統計耗時最長的task)
Long start = taskStartTimeMap.get(taskId);
if (start != null) {
Long cost = System.currentTimeMillis() - start;
LOG.info("taskGroup[{}] taskId[{}] is succeed,used[{}]ms", taskGroupId, taskId, cost);
//cost*1000*1000 轉換成PerfRecord記錄的ns,這裡主要是簡單登記,進行最長任務的列印。是以增加特定靜态方法
long ns = cost * 1000 * 1000L;
PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL, start, ns);
taskStartTimeMap.remove(taskId);
taskConfigMap.remove(taskId);
}
}
}
// 2.發現該taskGroup下taskExecutor的總狀态失敗則彙報錯誤
if (failedOrKilled) {
lastTaskGroupContainerComm = reportTaskGroupCommunication(lastTaskGroupContainerComm,
taskCntInThisTaskGroup);
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerComm.getThrowable());
}
//3.有任務未執行,且正在運作的任務數小于最大通道限制
Iterator<Configuration> iterator = taskQueue.iterator();
while (iterator.hasNext() && runTasks.size() < channelNum) {
Configuration taskConfig = iterator.next();
Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
int attemptCount = 1;
TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
if (lastExecutor != null) {
attemptCount = lastExecutor.getAttemptCount() + 1;
long now = System.currentTimeMillis();
long failedTime = lastExecutor.getTimeStamp();
//未到等待時間,繼續留在隊列
if (now - failedTime < taskRetryMsec) {
continue;
}
//上次失敗的task仍未結束
if (!lastExecutor.isShutdown()) {
if (now - failedTime > taskMaxWaitInMsec) {
markCommunicationFailed(taskId);
reportTaskGroupCommunication(lastTaskGroupContainerComm, taskCntInThisTaskGroup);
throw DataXException.asDataXException(WAIT_TIME_EXCEED, "task failover等待逾時");
} else {
lastExecutor.shutdown(); //再次嘗試關閉
continue;
}
} else {
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
this.taskGroupId, taskId, lastExecutor.getAttemptCount());
}
}
Configuration taskConfigForRun = taskMaxRetrys > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
iterator.remove();
runTasks.add(taskExecutor);
//上面,增加task到runTasks清單,是以在monitor裡注冊。
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
taskFailedExecutorMap.remove(taskId);
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
this.taskGroupId, taskId, attemptCount);
}
//4.任務清單為空,executor已結束, 搜集狀态為success--->成功
if (taskQueue.isEmpty() && isAllTaskDone(runTasks)
&& containerCommunicator.collectState() == State.SUCCEEDED) {
// 成功的情況下,也需要彙報一次。否則在任務結束非常快的情況下,采集的資訊将會不準确
lastTaskGroupContainerComm = reportTaskGroupCommunication(lastTaskGroupContainerComm,
taskCntInThisTaskGroup);
LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
break;
}
// 5.如果目前時間已經超出彙報時間的interval,那麼我們需要馬上彙報
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportMsec) {
lastTaskGroupContainerComm = reportTaskGroupCommunication(lastTaskGroupContainerComm,
taskCntInThisTaskGroup);
lastReportTimeStamp = now;
//taskMonitor對于正在運作的task,每reportIntervalInMillSec進行檢查
for (TaskExecutor taskExecutor : runTasks) {
taskMonitor.report(taskExecutor.getTaskId(),
this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
}
}
Thread.sleep(sleepMsec);
}
//6.最後還要彙報一次
reportTaskGroupCommunication(lastTaskGroupContainerComm, taskCntInThisTaskGroup);
} catch (Throwable e) {
Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
if (nowTaskGroupContainerCommunication.getThrowable() == null) {
nowTaskGroupContainerCommunication.setThrowable(e);
}
nowTaskGroupContainerCommunication.setState(State.FAILED);
this.containerCommunicator.report(nowTaskGroupContainerCommunication);
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if (!PerfTrace.getInstance().isJob()) {
//最後列印cpu的平均消耗,GC的統計
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
}
}
}
private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations) {
Map<Integer, Configuration> map = new HashMap<>();
for (Configuration taskConfig : configurations) {
int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
map.put(taskId, taskConfig);
}
return map;
}
/**
* 建構剩餘未運作的task。形成一個queue
*
* @param configurations
* @return List<Configuration> LinkedList 類型,可以保證任務的有序
*/
private List<Configuration> buildRemainTasks(List<Configuration> configurations) {
List<Configuration> remainTasks = new LinkedList<>();
for (Configuration taskConfig : configurations) {
remainTasks.add(taskConfig);
}
return remainTasks;
}
private TaskExecutor removeTask(List<TaskExecutor> taskList, int taskId) {
Iterator<TaskExecutor> iterator = taskList.iterator();
while (iterator.hasNext()) {
TaskExecutor taskExecutor = iterator.next();
if (taskExecutor.getTaskId() == taskId) {
iterator.remove();
return taskExecutor;
}
}
return null;
}
private boolean isAllTaskDone(List<TaskExecutor> taskList) {
for (TaskExecutor taskExecutor : taskList) {
if (!taskExecutor.isTaskFinished()) {
return false;
}
}
return true;
}
private Communication reportTaskGroupCommunication(
Communication lastTaskGroupContainerCommunication, int taskCount) {
Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());
Communication reportCommunication = CommunicationTool
.getReportCommunication(nowTaskGroupContainerCommunication,
lastTaskGroupContainerCommunication, taskCount);
this.containerCommunicator.report(reportCommunication);
return reportCommunication;
}
private void markCommunicationFailed(Integer taskId) {
Communication communication = containerCommunicator.getCommunication(taskId);
communication.setState(State.FAILED);
}
/**
* TaskExecutor是一個完整task的執行器
* 其中包括1:1的reader和writer
*/
class TaskExecutor {
private Configuration taskConfig;
private int taskId;
private int attemptCount;
private Channel channel;
private Thread readerThread;
private Thread writerThread;
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
/**
* 該處的taskCommunication在多處用到:
* 1. channel
* 2. readerRunner和writerRunner
* 3. reader和writer的taskPluginCollector
*/
private Communication taskCommunication;
public TaskExecutor(Configuration taskConf, int attemptCount) {
// 擷取該taskExecutor的配置
this.taskConfig = taskConf;
Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
&& null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),
"[reader|writer]的插件參數不能為空!");
// 得到taskId
this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
this.attemptCount = attemptCount;
/**
* 由taskId得到該taskExecutor的Communication
* 要傳給readerRunner和writerRunner,同時要傳給channel作統計用
*/
this.taskCommunication = containerCommunicator
.getCommunication(taskId);
Validate.notNull(this.taskCommunication,
String.format("taskId[%d]的Communication沒有注冊過", taskId));
this.channel = ClassUtil.instantiate(channelClz,
Channel.class, configuration);
this.channel.setCommunication(this.taskCommunication);
/**
* 擷取transformer的參數
*/
List<TransformerExecution> transformerInfoExecs = TransformerUtil
.buildTransformerInfo(taskConfig);
/**
* 生成writerThread
*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer", jobId, taskGroupId, this.taskId));
//通過設定thread的contextClassLoader,即可實作同步和主程式不通的加載器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(JOB_WRITER_NAME)));
/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader", jobId, taskGroupId, this.taskId));
/**
* 通過設定thread的contextClassLoader,即可實作同步和主程式不通的加載器
*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(JOB_READER_NAME)));
}
/**
* 具體的start
*/
public void doStart() {
this.writerThread.start();
// reader沒有起來,writer不可能結束
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
}
this.readerThread.start();
// 這裡reader可能很快結束
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
// 這裡有可能出現Reader線上啟動即挂情況 對于這類情況 需要立刻抛出異常
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
}
}
private AbstractRunner generateRunner(PluginType pluginType) {
return generateRunner(pluginType, null);
}
/**
* 根據插件類型+ 轉換執行器,生成抽象的運作器(readerRunner或 writerRunner)
*
* @param pluginType PluginType
* @param transformerExecs List<TransformerExecution>
* @return AbstractRunner
*/
private AbstractRunner generateRunner(PluginType pluginType,
List<TransformerExecution> transformerExecs) {
AbstractRunner newRunner;
TaskPluginCollector pluginCollector;
switch (pluginType) {
case READER:
newRunner = LoadUtil.loadPluginRunner(pluginType, taskConfig.getString(JOB_READER_NAME));
newRunner.setJobConf(this.taskConfig.getConfiguration(JOB_READER_PARAMETER));
pluginCollector = ClassUtil.instantiate(taskCollectClz, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication, PluginType.READER);
RecordSender recordSender;
if (transformerExecs != null && transformerExecs.size() > 0) {
recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId,
this.channel, this.taskCommunication, pluginCollector, transformerExecs);
} else {
recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
}
((ReaderRunner) newRunner).setRecordSender(recordSender); /** * 設定taskPlugin的collector,用來處理髒資料和job/task通信 */ newRunner.setTaskPluginCollector(pluginCollector); break; case WRITER: newRunner = LoadUtil.loadPluginRunner(pluginType, taskConfig.getString(JOB_WRITER_NAME));
newRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
pluginCollector = ClassUtil.instantiate(taskCollectClz, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication, PluginType.WRITER);
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger( this.channel, pluginCollector));
/**
* 設定taskPlugin的collector,用來處理髒資料和job/task通信
*/
newRunner.setTaskPluginCollector(pluginCollector);
break;
default:
throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR,
"Cant generateRunner for:" + pluginType);
}
newRunner.setTaskGroupId(taskGroupId);
newRunner.setTaskId(this.taskId);
newRunner.setRunnerCommunication(this.taskCommunication);
return newRunner;
}
/**
* 檢查任務是否結束
*
* @return boolean
*/
private boolean isTaskFinished() {
// 如果reader 或 writer沒有完成工作,那麼直接傳回工作沒有完成
if (readerThread.isAlive() || writerThread.isAlive()) {
return false;
}
// 如果任務通訊類不空,或沒結束,name傳回工作沒有完成(雖然read和write完成,但是通訊還沒完成)
if (taskCommunication == null || !taskCommunication.isFinished()) {
return false;
}
return true;
}
private int getTaskId() {
return taskId;
}
private long getTimeStamp() {
return taskCommunication.getTimestamp();
}
private int getAttemptCount() {
return attemptCount;
}
private boolean supportFailOver() {
return writerRunner.supportFailOver();
}
private void shutdown() {
writerRunner.shutdown();
readerRunner.shutdown();
if (writerThread.isAlive()) {
writerThread.interrupt();
}
if (readerThread.isAlive()) {
readerThread.interrupt();
}
}
private boolean isShutdown() {
return !readerThread.isAlive() && !writerThread.isAlive();
}
}
}
複制
注:
- 對源碼進行略微改動,主要修改為 1 阿裡代碼規約掃描出來的,2 clean code;
- 所有代碼都已經上傳到github(master分支和dev),可以免費白嫖
釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/145348.html原文連結:https://javaforall.cn