任務插件開發
提醒:目前任務插件開發暫不支援熱部署
基于SHELL的任務
基于YARN的計算(參見MapReduceTask)
- 需要在 cn.escheduler.server.worker.task 下的 TaskManager 類中建立自定義任務(也需在TaskType注冊對應的任務類型)
- 需要繼承cn.escheduler.server.worker.task 下的 AbstractYarnTask
- 構造方法排程 AbstractYarnTask 構造方法
- 繼承 AbstractParameters 自定義任務參數實體
- 重寫 AbstractTask 的 init 方法中解析自定義任務參數
- 重寫 buildCommand 封裝command
基于非YARN的計算(參見ShellTask)
- 需要在 cn.escheduler.server.worker.task 下的 TaskManager 中建立自定義任務
- 需要繼承cn.escheduler.server.worker.task 下的 AbstractTask
- 構造方法中執行個體化 ShellCommandExecutor
public ShellTask(TaskProps props, Logger logger) { super(props, logger); this.taskDir = props.getTaskDir(); this.processTask = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), props.getTaskTimeout(), logger); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); }
傳入自定義任務的 TaskProps和自定義Logger,TaskProps 封裝了任務的資訊,Logger分裝了自定義日志資訊
- 重寫 AbstractTask 的 init 方法中解析自定義任務參數實體
- 重寫 handle 方法,調用 ShellCommandExecutor 的 run 方法,第一個參數傳入自己的command,第二個參數傳入 ProcessDao,設定相應的 exitStatusCode
基于非SHELL的任務(參見SqlTask)
- 構造方法或者重寫 AbstractTask 的 init 方法中,解析自定義任務參數實體
- 重寫 handle 方法實作業務邏輯并設定相應的exitStatusCode