天天看點

分布式工作流任務排程系統EasyScheduler自定義任務插件開發

任務插件開發

提醒:目前任務插件開發暫不支援熱部署

基于SHELL的任務

基于YARN的計算(參見MapReduceTask)

  • 需要在 cn.escheduler.server.worker.task 下的 TaskManager 類中建立自定義任務(也需在TaskType注冊對應的任務類型)
  • 需要繼承cn.escheduler.server.worker.task 下的 AbstractYarnTask
  • 構造方法排程 AbstractYarnTask 構造方法
  • 繼承 AbstractParameters 自定義任務參數實體
  • 重寫 AbstractTask 的 init 方法中解析自定義任務參數
  • 重寫 buildCommand 封裝command

基于非YARN的計算(參見ShellTask)

  • 需要在 cn.escheduler.server.worker.task 下的 TaskManager 中建立自定義任務
  • 需要繼承cn.escheduler.server.worker.task 下的 AbstractTask
  • 構造方法中執行個體化 ShellCommandExecutor
    public ShellTask(TaskProps props, Logger logger) {
      super(props, logger);
    
      this.taskDir = props.getTaskDir();
    
      this.processTask = new ShellCommandExecutor(this::logHandle,
          props.getTaskDir(), props.getTaskAppId(),
          props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
          props.getTaskTimeout(), logger);
      this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
    }           

傳入自定義任務的 TaskProps和自定義Logger,TaskProps 封裝了任務的資訊,Logger分裝了自定義日志資訊

  • 重寫 AbstractTask 的 init 方法中解析自定義任務參數實體
  • 重寫 handle 方法,調用 ShellCommandExecutor 的 run 方法,第一個參數傳入自己的command,第二個參數傳入 ProcessDao,設定相應的 exitStatusCode

基于非SHELL的任務(參見SqlTask)

  • 構造方法或者重寫 AbstractTask 的 init 方法中,解析自定義任務參數實體
  • 重寫 handle 方法實作業務邏輯并設定相應的exitStatusCode