天天看點

datax(8):TaskGroupContainer源碼解讀

大家好,又見面了,我是你們的朋友全棧君。

繼續深挖 datax裡的container,任何一個任務進入datax都會判斷是jobContainer還是TaskGroupContainer。那後者要做哪些事情。

一、TaskGroupContainer概述

JobContainer将所有的task配置設定到TaskGroup中執行,TaskGroup啟動5個線程去消費所有的task

TaskGroupContainer裡的主入口為start方法,實作自AbstractContainer.start

TaskGroupContainer.start方法主要做了如下幾件事

1、初始化task執行相關的狀态資訊,分别是taskId->Congifuration的map、待運作的任務隊列taskQueue、運作失敗任務taskFailedExecutorMap、運作中的任務runTasks、任務開始時間taskStartTimeMap
  2、循環檢測所有任務的執行狀态
    1)判斷是否有失敗的task,如果有則放入失敗對立中,并檢視目前的執行是否支援重跑和failOver,如果支援則重新放回執行隊列中;如果沒有失敗,則标記任務執行成功,并從狀态輪詢map中移除
    2)如果發現有失敗的任務,則彙報目前TaskGroup的狀态,并抛出異常
    3)檢視目前執行隊列的長度,如果發現執行隊列還有通道,則建構TaskExecutor加入執行隊列,并從待運作移除
    4)檢查執行隊列和所有的任務狀态,如果所有的任務都執行成功,則彙報taskGroup的狀态并從循環中退出
    5)檢查目前時間是否超過彙報時間檢測,如果是,則彙報目前狀态
    6)當所有的執行完成從while中退出之後,再次全局彙報目前的任務狀态           

複制

二、主要方法

datax(8):TaskGroupContainer源碼解讀

三、主入口start的時序圖

datax(8):TaskGroupContainer源碼解讀

四、源碼解讀

package com.alibaba.datax.core.taskgroup;



/**
 * JobContainer将所有的task配置設定到TaskGroup中執行,TaskGroup啟動5個線程去消費所有的task
 */
public class TaskGroupContainer extends AbstractContainer { 
   

  private static final Logger LOG = LoggerFactory.getLogger(TaskGroupContainer.class);

  /**
   * 目前taskGroup所屬jobId
   */
  private long jobId;

  /**
   * 目前taskGroupId
   */
  private int taskGroupId;

  /**
   * 使用的channel類
   */
  private String channelClz;

  /**
   * task收集器使用的類
   */
  private String taskCollectClz;

  private TaskMonitor taskMonitor = TaskMonitor.getInstance();


  public TaskGroupContainer(Configuration cfg) { 
   
    super(cfg);
    initCommunicator(cfg);

    this.jobId = configuration.getLong(DATAX_CORE_CONTAINER_JOB_ID);
    this.taskGroupId = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_ID);
    this.channelClz = configuration.getString(DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
    this.taskCollectClz = configuration.getString(DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
  }

  /**
   * 初始化容器之間的溝通者
   *
   * @param configuration
   */
  private void initCommunicator(Configuration configuration) { 
   
    super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));

  }

  public long getJobId() { 
   
    return jobId;
  }

  public int getTaskGroupId() { 
   
    return taskGroupId;
  }

  /**
   * 1、初始化task執行相關的狀态資訊,分别是taskId->Cfg的map、待運作的任務隊列taskQueue、運作失敗任務taskFailedExecutorMap、
   *     運作中的任務runTasks、任務開始時間taskStartTimeMap
   * 2、循環檢測所有任務的執行狀态
   * 1)判斷是否有失敗的task,如果有則放入失敗對立中,并檢視目前的執行是否支援重跑和failOver,如果支援則重新放回執行隊列中;
   * 如果沒有失敗,則标記任務執行成功,并從狀态輪詢map中移除
   * 2)如果發現有失敗的任務,則彙報目前TaskGroup的狀态,并抛出異常
   * 3)檢視目前執行隊列的長度,如果發現執行隊列還有通道,則建構TaskExecutor加入執行隊列,并從待運作移除
   * 4)檢查執行隊列和所有的任務狀态,如果所有的任務都執行成功,則彙報taskGroup的狀态并從循環中退出
   * 5)檢查目前時間是否超過彙報時間檢測,如果是,則彙報目前狀态
   * 6)當所有的執行完成從while中退出之後,再次全局彙報目前的任務狀态
   */
  @Override
  public void start() { 
   
    try { 
   
      //狀态check時間間隔,較短,可以把任務及時分發到對應channel中
      int sleepMsec = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
      //狀态彙報時間間隔,稍長,避免大量彙報
      long reportMsec = configuration.getLong(DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL, 10000);
      /**
       * 2分鐘彙報一次性能統計
       */
      // 擷取channel數目
      int channelNum = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
      int taskMaxRetrys = configuration.getInt(DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
      long taskRetryMsec = configuration
          .getLong(DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
      long taskMaxWaitInMsec = configuration
          .getLong(DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);

      List<Configuration> taskCfgs = configuration.getListConfiguration(DATAX_JOB_CONTENT);

      if (LOG.isDebugEnabled()) { 
   
        LOG.debug("taskGroup[{}]'s task configs[{}]", taskGroupId, JSON.toJSONString(taskCfgs));
      }

      int taskCntInThisTaskGroup = taskCfgs.size();
      LOG.info(String.format("taskGroupId=[%d] start [%d] channels for [%d] tasks.",
          this.taskGroupId, channelNum, taskCntInThisTaskGroup));

      this.containerCommunicator.registerCommunication(taskCfgs);
      //taskId與task配置
      Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskCfgs);
      //待運作task清單
      List<Configuration> taskQueue = buildRemainTasks(taskCfgs);
      //taskId與上次失敗執行個體
      Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<>();
      //正在運作task
      List<TaskExecutor> runTasks = new ArrayList<>(channelNum);
      //任務開始時間
      Map<Integer, Long> taskStartTimeMap = new HashMap<>();

      long lastReportTimeStamp = 0;
      Communication lastTaskGroupContainerComm = new Communication();

      while (true) { 
   
        //1.判斷task狀态
        boolean failedOrKilled = false;
        //因為實作是TGContainerCommunicator,是以傳回是 Map: key=taskId, value=Communication
        Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
        for (Map.Entry<Integer, Communication> entry : communicationMap.entrySet()) { 
   
          Integer taskId = entry.getKey();
          Communication taskCommunication = entry.getValue();
          // 通訊類沒有結束,就繼續執行後面代碼
          if (!taskCommunication.isFinished()) { 
   
            continue;
          }
          // 任務正在執行,是以從runTasks中根據taskId移除
          TaskExecutor taskExecutor = removeTask(runTasks, taskId);
          //上面從runTasks裡移除了,是以對應在monitor裡移除
          taskMonitor.removeTask(taskId);

          //失敗,看task是否支援failover,重試次數未超過最大限制
          if (taskCommunication.getState() == State.FAILED) { 
   
            taskFailedExecutorMap.put(taskId, taskExecutor);
            // 如果 任務支援失敗重試,并且重試次數小于 任務最大重試次數,則重新将任務加入到隊列
            if (taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetrys) { 
   
              //關閉老的executor
              taskExecutor.shutdown();
              //将task的狀态重置
              containerCommunicator.resetCommunication(taskId);
              Configuration taskConfig = taskConfigMap.get(taskId);
              //重新加入任務清單
              taskQueue.add(taskConfig);
            } else { 
   
              failedOrKilled = true;
              break;
            }
          } else if (taskCommunication.getState() == State.KILLED) { 
   
            failedOrKilled = true;
            break;
          } else if (taskCommunication.getState() == State.SUCCEEDED) { 
   
            // 如果 task成功,将該資訊記錄到性能記錄類PerfRecord(友善統計耗時最長的task)
            Long start = taskStartTimeMap.get(taskId);
            if (start != null) { 
   
              Long cost = System.currentTimeMillis() - start;
              LOG.info("taskGroup[{}] taskId[{}] is succeed,used[{}]ms", taskGroupId, taskId, cost);
              //cost*1000*1000 轉換成PerfRecord記錄的ns,這裡主要是簡單登記,進行最長任務的列印。是以增加特定靜态方法
              long ns = cost * 1000 * 1000L;
              PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL, start, ns);
              taskStartTimeMap.remove(taskId);
              taskConfigMap.remove(taskId);
            }
          }
        }

        // 2.發現該taskGroup下taskExecutor的總狀态失敗則彙報錯誤
        if (failedOrKilled) { 
   
          lastTaskGroupContainerComm = reportTaskGroupCommunication(lastTaskGroupContainerComm,
              taskCntInThisTaskGroup);

          throw DataXException.asDataXException(
              FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerComm.getThrowable());
        }

        //3.有任務未執行,且正在運作的任務數小于最大通道限制
        Iterator<Configuration> iterator = taskQueue.iterator();
        while (iterator.hasNext() && runTasks.size() < channelNum) { 
   
          Configuration taskConfig = iterator.next();
          Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
          int attemptCount = 1;
          TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
          if (lastExecutor != null) { 
   
            attemptCount = lastExecutor.getAttemptCount() + 1;
            long now = System.currentTimeMillis();
            long failedTime = lastExecutor.getTimeStamp();
            //未到等待時間,繼續留在隊列
            if (now - failedTime < taskRetryMsec) { 
   
              continue;
            }
            //上次失敗的task仍未結束
            if (!lastExecutor.isShutdown()) { 
   
              if (now - failedTime > taskMaxWaitInMsec) { 
   
                markCommunicationFailed(taskId);
                reportTaskGroupCommunication(lastTaskGroupContainerComm, taskCntInThisTaskGroup);
                throw DataXException.asDataXException(WAIT_TIME_EXCEED, "task failover等待逾時");
              } else { 
   
                lastExecutor.shutdown(); //再次嘗試關閉
                continue;
              }
            } else { 
   
              LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
                  this.taskGroupId, taskId, lastExecutor.getAttemptCount());
            }
          }
          Configuration taskConfigForRun = taskMaxRetrys > 1 ? taskConfig.clone() : taskConfig;
          TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
          taskStartTimeMap.put(taskId, System.currentTimeMillis());
          taskExecutor.doStart();

          iterator.remove();
          runTasks.add(taskExecutor);

          //上面,增加task到runTasks清單,是以在monitor裡注冊。
          taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));

          taskFailedExecutorMap.remove(taskId);
          LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
              this.taskGroupId, taskId, attemptCount);
        }

        //4.任務清單為空,executor已結束, 搜集狀态為success--->成功
        if (taskQueue.isEmpty() && isAllTaskDone(runTasks)
            && containerCommunicator.collectState() == State.SUCCEEDED) { 
   
          // 成功的情況下,也需要彙報一次。否則在任務結束非常快的情況下,采集的資訊将會不準确
          lastTaskGroupContainerComm = reportTaskGroupCommunication(lastTaskGroupContainerComm,
              taskCntInThisTaskGroup);

          LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
          break;
        }

        // 5.如果目前時間已經超出彙報時間的interval,那麼我們需要馬上彙報
        long now = System.currentTimeMillis();
        if (now - lastReportTimeStamp > reportMsec) { 
   
          lastTaskGroupContainerComm = reportTaskGroupCommunication(lastTaskGroupContainerComm,
              taskCntInThisTaskGroup);

          lastReportTimeStamp = now;

          //taskMonitor對于正在運作的task,每reportIntervalInMillSec進行檢查
          for (TaskExecutor taskExecutor : runTasks) { 
   
            taskMonitor.report(taskExecutor.getTaskId(),
                this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
          }
        }
        Thread.sleep(sleepMsec);
      }

      //6.最後還要彙報一次
      reportTaskGroupCommunication(lastTaskGroupContainerComm, taskCntInThisTaskGroup);

    } catch (Throwable e) { 
   
      Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
      if (nowTaskGroupContainerCommunication.getThrowable() == null) { 
   
        nowTaskGroupContainerCommunication.setThrowable(e);
      }
      nowTaskGroupContainerCommunication.setState(State.FAILED);
      this.containerCommunicator.report(nowTaskGroupContainerCommunication);

      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
    } finally { 
   
      if (!PerfTrace.getInstance().isJob()) { 
   
        //最後列印cpu的平均消耗,GC的統計
        VMInfo vmInfo = VMInfo.getVmInfo();
        if (vmInfo != null) { 
   
          vmInfo.getDelta(false);
          LOG.info(vmInfo.totalString());
        }
        LOG.info(PerfTrace.getInstance().summarizeNoException());
      }
    }
  }

  private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations) { 
   
    Map<Integer, Configuration> map = new HashMap<>();
    for (Configuration taskConfig : configurations) { 
   
      int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
      map.put(taskId, taskConfig);
    }
    return map;
  }

  /**
   * 建構剩餘未運作的task。形成一個queue
   *
   * @param configurations
   * @return List<Configuration> LinkedList 類型,可以保證任務的有序
   */
  private List<Configuration> buildRemainTasks(List<Configuration> configurations) { 
   
    List<Configuration> remainTasks = new LinkedList<>();
    for (Configuration taskConfig : configurations) { 
   
      remainTasks.add(taskConfig);
    }
    return remainTasks;
  }

  private TaskExecutor removeTask(List<TaskExecutor> taskList, int taskId) { 
   
    Iterator<TaskExecutor> iterator = taskList.iterator();
    while (iterator.hasNext()) { 
   
      TaskExecutor taskExecutor = iterator.next();
      if (taskExecutor.getTaskId() == taskId) { 
   
        iterator.remove();
        return taskExecutor;
      }
    }
    return null;
  }

  private boolean isAllTaskDone(List<TaskExecutor> taskList) { 
   
    for (TaskExecutor taskExecutor : taskList) { 
   
      if (!taskExecutor.isTaskFinished()) { 
   
        return false;
      }
    }
    return true;
  }

  private Communication reportTaskGroupCommunication(
      Communication lastTaskGroupContainerCommunication, int taskCount) { 
   
    Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
    nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());
    Communication reportCommunication = CommunicationTool
        .getReportCommunication(nowTaskGroupContainerCommunication,
            lastTaskGroupContainerCommunication, taskCount);
    this.containerCommunicator.report(reportCommunication);
    return reportCommunication;
  }

  private void markCommunicationFailed(Integer taskId) { 
   
    Communication communication = containerCommunicator.getCommunication(taskId);
    communication.setState(State.FAILED);
  }

  /**
   * TaskExecutor是一個完整task的執行器
   * 其中包括1:1的reader和writer
   */
  class TaskExecutor { 
   

    private Configuration taskConfig;

    private int taskId;

    private int attemptCount;

    private Channel channel;

    private Thread readerThread;

    private Thread writerThread;

    private ReaderRunner readerRunner;

    private WriterRunner writerRunner;

    /**
     * 該處的taskCommunication在多處用到:
     * 1. channel
     * 2. readerRunner和writerRunner
     * 3. reader和writer的taskPluginCollector
     */
    private Communication taskCommunication;

    public TaskExecutor(Configuration taskConf, int attemptCount) { 
   
      // 擷取該taskExecutor的配置
      this.taskConfig = taskConf;
      Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
              && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),
          "[reader|writer]的插件參數不能為空!");

      // 得到taskId
      this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
      this.attemptCount = attemptCount;

      /**
       * 由taskId得到該taskExecutor的Communication
       * 要傳給readerRunner和writerRunner,同時要傳給channel作統計用
       */
      this.taskCommunication = containerCommunicator
          .getCommunication(taskId);
      Validate.notNull(this.taskCommunication,
          String.format("taskId[%d]的Communication沒有注冊過", taskId));
      this.channel = ClassUtil.instantiate(channelClz,
          Channel.class, configuration);
      this.channel.setCommunication(this.taskCommunication);

      /**
       * 擷取transformer的參數
       */

      List<TransformerExecution> transformerInfoExecs = TransformerUtil
          .buildTransformerInfo(taskConfig);

      /**
       * 生成writerThread
       */
      writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
      this.writerThread = new Thread(writerRunner,
          String.format("%d-%d-%d-writer", jobId, taskGroupId, this.taskId));
      //通過設定thread的contextClassLoader,即可實作同步和主程式不通的加載器
      this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
          PluginType.WRITER, this.taskConfig.getString(JOB_WRITER_NAME)));

      /**
       * 生成readerThread
       */
      readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerInfoExecs);
      this.readerThread = new Thread(readerRunner,
          String.format("%d-%d-%d-reader", jobId, taskGroupId, this.taskId));
      /**
       * 通過設定thread的contextClassLoader,即可實作同步和主程式不通的加載器
       */
      this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
          PluginType.READER, this.taskConfig.getString(JOB_READER_NAME)));
    }

    /**
     * 具體的start
     */
    public void doStart() { 
   
      this.writerThread.start();
      // reader沒有起來,writer不可能結束
      if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) { 
   
        throw DataXException.asDataXException(
            FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
      }

      this.readerThread.start();
      // 這裡reader可能很快結束
      if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) { 
   
        // 這裡有可能出現Reader線上啟動即挂情況 對于這類情況 需要立刻抛出異常
        throw DataXException.asDataXException(
            FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
      }
    }


    private AbstractRunner generateRunner(PluginType pluginType) { 
   
      return generateRunner(pluginType, null);
    }

    /**
     * 根據插件類型+ 轉換執行器,生成抽象的運作器(readerRunner或 writerRunner)
     *
     * @param pluginType       PluginType
     * @param transformerExecs List<TransformerExecution>
     * @return AbstractRunner
     */
    private AbstractRunner generateRunner(PluginType pluginType,
        List<TransformerExecution> transformerExecs) { 
   

      AbstractRunner newRunner;
      TaskPluginCollector pluginCollector;

      switch (pluginType) { 
   
        case READER:
          newRunner = LoadUtil.loadPluginRunner(pluginType, taskConfig.getString(JOB_READER_NAME));
          newRunner.setJobConf(this.taskConfig.getConfiguration(JOB_READER_PARAMETER));

          pluginCollector = ClassUtil.instantiate(taskCollectClz, AbstractTaskPluginCollector.class,
              configuration, this.taskCommunication, PluginType.READER);

          RecordSender recordSender;
          if (transformerExecs != null && transformerExecs.size() > 0) { 
   
            recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId,
                this.channel, this.taskCommunication, pluginCollector, transformerExecs);
          } else { 
   
            recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
          }
          ((ReaderRunner) newRunner).setRecordSender(recordSender); /** * 設定taskPlugin的collector,用來處理髒資料和job/task通信 */ newRunner.setTaskPluginCollector(pluginCollector); break; case WRITER: newRunner = LoadUtil.loadPluginRunner(pluginType, taskConfig.getString(JOB_WRITER_NAME));
          newRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));

          pluginCollector = ClassUtil.instantiate(taskCollectClz, AbstractTaskPluginCollector.class,
              configuration, this.taskCommunication, PluginType.WRITER);
          ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger( this.channel, pluginCollector));
          /**
           * 設定taskPlugin的collector,用來處理髒資料和job/task通信
           */
          newRunner.setTaskPluginCollector(pluginCollector);
          break;
        default:
          throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR,
              "Cant generateRunner for:" + pluginType);
      }

      newRunner.setTaskGroupId(taskGroupId);
      newRunner.setTaskId(this.taskId);
      newRunner.setRunnerCommunication(this.taskCommunication);

      return newRunner;
    }


    /**
     * 檢查任務是否結束
     *
     * @return boolean
     */
    private boolean isTaskFinished() { 
   
      // 如果reader 或 writer沒有完成工作,那麼直接傳回工作沒有完成
      if (readerThread.isAlive() || writerThread.isAlive()) { 
   
        return false;
      }
      // 如果任務通訊類不空,或沒結束,name傳回工作沒有完成(雖然read和write完成,但是通訊還沒完成)
      if (taskCommunication == null || !taskCommunication.isFinished()) { 
   
        return false;
      }
      return true;
    }

    private int getTaskId() { 
   
      return taskId;
    }

    private long getTimeStamp() { 
   
      return taskCommunication.getTimestamp();
    }

    private int getAttemptCount() { 
   
      return attemptCount;
    }

    private boolean supportFailOver() { 
   
      return writerRunner.supportFailOver();
    }

    private void shutdown() { 
   
      writerRunner.shutdown();
      readerRunner.shutdown();
      if (writerThread.isAlive()) { 
   
        writerThread.interrupt();
      }
      if (readerThread.isAlive()) { 
   
        readerThread.interrupt();
      }
    }

    private boolean isShutdown() { 
   
      return !readerThread.isAlive() && !writerThread.isAlive();
    }
  }
}           

複制

注:

  1. 對源碼進行略微改動,主要修改為 1 阿裡代碼規約掃描出來的,2 clean code;
  2. 所有代碼都已經上傳到github(master分支和dev),可以免費白嫖

釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/145348.html原文連結:https://javaforall.cn