天天看點

分布式作業系統 Elastic-Job-Lite 源碼分析 —— 作業事件追蹤1. 概述2. 作業事件總線3. 作業事件4. 作業監聽器

1. 概述

本文主要分享 Elastic-Job-Lite 作業事件追蹤。

另外,Elastic-Job-Cloud 作業事件追蹤 和 Elastic-Job-Lite 基本類似,不單獨開一篇文章,記錄在該文章裡。如果你對 Elastic-Job-Cloud 暫時不感興趣,可以跳過相應部分。

Elastic-Job 提供了事件追蹤功能,可通過事件訂閱的方式處理排程過程的重要事件,用于查詢、統計和監控。Elastic-Job 目前訂閱兩種事件,基于關系型資料庫記錄事件。

涉及到主要類的類圖如下( 打開大圖 ):

分布式作業系統 Elastic-Job-Lite 源碼分析 —— 作業事件追蹤1. 概述2. 作業事件總線3. 作業事件4. 作業監聽器
  • 以上類在

    com.dangdang.ddframe.job.event

    包,不僅為 Elastic-Job-Lite,而且為 Elastic-Job-Cloud 實作了事件追蹤功能。
  • 作業事件:粉色的類。
  • 作業事件總線:黃色的類。
  • 作業事件監聽器:藍色的類。

你行好事會因為得到贊賞而愉悅

同理,開源項目貢獻者會因為 Star 而更加有動力

為 Elastic-Job 點贊!傳送門

2. 作業事件總線

JobEventBus,作業事件總線,提供了注冊監聽器、釋出事件兩個方法。

建立 JobEventBus 代碼如下:

public final class JobEventBus {

    /**
     * 作業事件配置
     */
    private final JobEventConfiguration jobEventConfig;
    /**
     * 線程池執行服務對象
     */
    private final ExecutorServiceObject executorServiceObject;
    /**
     * 事件總線
     */
    private final EventBus eventBus;
    /**
     * 是否注冊作業監聽器
     */
    private boolean isRegistered;

    public JobEventBus() {
        jobEventConfig = null;
        executorServiceObject = null;
        eventBus = null;
    }

    public JobEventBus(final JobEventConfiguration jobEventConfig) {
        this.jobEventConfig = jobEventConfig;
        executorServiceObject = new ExecutorServiceObject("job-event", Runtime.getRuntime().availableProcessors() * 2);
        // 建立 異步事件總線
        eventBus = new AsyncEventBus(executorServiceObject.createExecutorService());
        // 注冊 事件監聽器
        register();
    }
}           

複制

  • JobEventBus 基于 Google Guava EventBus,在《Sharding-JDBC 源碼分析 —— SQL 執行》「4.1 EventBus」有詳細分享。這裡要注意的是 AsyncEventBus( 異步事件總線 ),注冊在其上面的監聽器是異步監聽執行,事件釋出無需阻塞等待監聽器執行完邏輯,是以對性能不存在影響。
  • 使用 JobEventConfiguration( 作業事件配置 ) 建立事件監聽器,調用

    #register()

    方法進行注冊監聽。

    private void register() { try { eventBus.register(jobEventConfig.createJobEventListener()); isRegistered = true; } catch (final JobEventListenerConfigurationException ex) { log.error("Elastic job: create JobEventListener failure, error is: ", ex); } }

    • 該方法是私有(

      private

      )方法,隻能使用 JobEventConfiguration 建立事件監聽器注冊。當不傳遞該配置時,意味着不開啟事件追蹤功能。

釋出作業事件

釋出作業事件( JobEvent ) 代碼如下:

// JobEventBus.java
public void post(final JobEvent event) {
   if (isRegistered && !executorServiceObject.isShutdown()) {
       eventBus.post(event);
   }
}           

複制

在 Elaistc-Job-Lite 裡,LiteJobFacade 對

JobEventBus#post(...)

進行封裝,提供給作業執行器( AbstractElasticJobExecutor )調用( Elastic-Job-Cloud 實際也進行了封裝 ):

// LiteJobFacade.java
@Override
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
   jobEventBus.post(jobExecutionEvent);
}

@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
   TaskContext taskContext = TaskContext.from(taskId);
   jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
           taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
   if (!Strings.isNullOrEmpty(message)) {
       log.trace(message);
   }
}           

複制

  • TaskContext 通過

    #from(…)

    方法,對作業任務ID(

    taskId

    ) 解析,擷取任務上下文。TaskContext 代碼注釋很完整,點選連結直接檢視。

3. 作業事件

目前有兩種作業事件( JobEvent ):

  • JobStatusTraceEvent,作業狀态追蹤事件。
  • JobExecutionEvent,作業執行追蹤事件。

本小節分享兩方面:

  • 作業事件釋出時機。
  • Elastic-Job 基于關系型資料庫記錄事件的表結構。

3.1 作業狀态追蹤事件

JobStatusTraceEvent,作業狀态追蹤事件。

代碼如下:

public final class JobStatusTraceEvent implements JobEvent {

    /**
     * 主鍵
     */
    private String id = UUID.randomUUID().toString();
    /**
     * 作業名稱
     */
    private final String jobName;
    /**
     * 原作業任務ID
     */
    @Setter
    private String originalTaskId = "";
    /**
     * 作業任務ID
     * 來自 {@link com.dangdang.ddframe.job.executor.ShardingContexts#taskId}
     */
    private final String taskId;
    /**
     * 執行作業伺服器的名字
     * Elastic-Job-Lite,作業節點的 IP 位址
     * Elastic-Job-Cloud,Mesos 執行機主鍵
     */
    private final String slaveId;
    /**
     * 任務來源
     */
    private final Source source;
    /**
     * 任務執行類型
     */
    private final ExecutionType executionType;
    /**
     * 作業分片項
     * 多個分片項以逗号分隔
     */
    private final String shardingItems;
    /**
     * 任務執行狀态
     */
    private final State state;
    /**
     * 相關資訊
     */
    private final String message;
    /**
     * 記錄建立時間
     */
    private Date creationTime = new Date();
}           

複制

  • ExecutionType,執行類型。

    public enum ExecutionType {/** * 準備執行的任務. */ READY, /** * 失效轉移的任務. */ FAILOVER }

  • Source,任務來源。

    public enum Source { /** * Elastic-Job-Cloud 排程器 */ CLOUD_SCHEDULER, /** * Elastic-Job-Cloud 執行器 */ CLOUD_EXECUTOR, /** * Elastic-Job-Lite 執行器 */ LITE_EXECUTOR }

  • State,任務執行狀态。

    public enum State { /** * 開始中 */ TASK_STAGING, /** * 運作中 */ TASK_RUNNING, /** * 完成(正常) */ TASK_FINISHED, /** * 完成(異常) */ TASK_ERROR, TASK_KILLED, TASK_LOST, TASK_FAILED, TASK_DROPPED, TASK_GONE, TASK_GONE_BY_OPERATOR, TASK_UNREACHABLE, TASK_UNKNOWN }

    • Elastic-Job-Lite 使用 TASK_STAGING、TASK_RUNNING、TASK_FINISHED、TASK_ERROR 四種執行狀态。
    • Elastic-Job-Cloud 使用所有執行狀态。

關系資料庫表

JOB_STATUS_TRACE_LOG

結構如下:

CREATE TABLE `JOB_STATUS_TRACE_LOG` (
  `id` varchar(40) COLLATE utf8_bin NOT NULL,
  `job_name` varchar(100) COLLATE utf8_bin NOT NULL,
  `original_task_id` varchar(255) COLLATE utf8_bin NOT NULL,
  `task_id` varchar(255) COLLATE utf8_bin NOT NULL,
  `slave_id` varchar(50) COLLATE utf8_bin NOT NULL,
  `source` varchar(50) COLLATE utf8_bin NOT NULL,
  `execution_type` varchar(20) COLLATE utf8_bin NOT NULL,
  `sharding_item` varchar(100) COLLATE utf8_bin NOT NULL,
  `state` varchar(20) COLLATE utf8_bin NOT NULL,
  `message` varchar(4000) COLLATE utf8_bin DEFAULT NULL,
  `creation_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `TASK_ID_STATE_INDEX` (`task_id`,`state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin           

複制

  • Elastic-Job-Lite 一次作業執行記錄如下( 打開大圖 ):
分布式作業系統 Elastic-Job-Lite 源碼分析 —— 作業事件追蹤1. 概述2. 作業事件總線3. 作業事件4. 作業監聽器

JobStatusTraceEvent 在 Elastic-Job-Lite 釋出時機:

  • State.TASK_STAGING:

    // AbstractElasticJobExecutor.java public final void execute() { // ... 省略無關代碼 // 釋出作業狀态追蹤事件(State.TASK_STAGING) if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } // ... 省略無關代碼 }

  • State.TASK_RUNNING:

    // AbstractElasticJobExecutor.java private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) { // ... 省略無關代碼 // 釋出作業狀态追蹤事件(State.TASK_RUNNING) if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); } // ... 省略無關代碼 }

  • State.TASK_FINISHED、State.TASK_ERROR【第一種】:

    // AbstractElasticJobExecutor.java public final void execute() { // ... 省略無關代碼 // 跳過 存在運作中的被錯過作業 if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { // 釋出作業狀态追蹤事件(State.TASK_FINISHED) if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } }

  • State.TASK_FINISHED、State.TASK_ERROR【第二種】:

    // AbstractElasticJobExecutor.java private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) { // ... 省略無關代碼 try { process(shardingContexts, executionSource); } finally { // ... 省略無關代碼 // 根據是否有異常,釋出作業狀态追蹤事件(State.TASK_FINISHED / State.TASK_ERROR) if (itemErrorMessages.isEmpty()) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } } else { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); } } } }

JobStatusTraceEvent 在 Elastic-Job-Cloud 釋出時機:

Elastic-Job-Cloud 除了上文 Elastic-Job-Lite 會多一個場景下記錄作業狀态追蹤事件( State.TASK_STAGING),實作代碼如下:

// TaskLaunchScheduledService.java
private JobStatusTraceEvent createJobStatusTraceEvent(final TaskContext taskContext) {
  TaskContext.MetaInfo metaInfo = taskContext.getMetaInfo();
  JobStatusTraceEvent result = new JobStatusTraceEvent(metaInfo.getJobName(), taskContext.getId(), taskContext.getSlaveId(),
          Source.CLOUD_SCHEDULER, taskContext.getType(), String.valueOf(metaInfo.getShardingItems()), JobStatusTraceEvent.State.TASK_STAGING, "");
  // 失效轉移
  if (ExecutionType.FAILOVER == taskContext.getType()) {
      Optional<String> taskContextOptional = facadeService.getFailoverTaskId(metaInfo);
      if (taskContextOptional.isPresent()) {
          result.setOriginalTaskId(taskContextOptional.get());
      }
  }
  return result;
}           

複制

  • 任務送出排程服務( TaskLaunchScheduledService )送出任務時,記錄釋出作業狀态追蹤事件(State.TASK_STAGING)。

Elastic-Job-Cloud 根據 Mesos Master 通知任務狀态變更,記錄多種作業狀态追蹤事件,實作代碼如下:

// SchedulerEngine.java
@Override
public void statusUpdate(final SchedulerDriver schedulerDriver, final Protos.TaskStatus taskStatus) {
   String taskId = taskStatus.getTaskId().getValue();
   TaskContext taskContext = TaskContext.from(taskId);
   String jobName = taskContext.getMetaInfo().getJobName();
   log.trace("call statusUpdate task state is: {}, task id is: {}", taskStatus.getState(), taskId);
   //
   jobEventBus.post(new JobStatusTraceEvent(jobName, taskContext.getId(), taskContext.getSlaveId(), Source.CLOUD_SCHEDULER,
           taskContext.getType(), String.valueOf(taskContext.getMetaInfo().getShardingItems()), State.valueOf(taskStatus.getState().name()), taskStatus.getMessage()));
   // ... 省略無關代碼
}           

複制

3.2 作業執行追蹤事件

JobExecutionEvent,作業執行追蹤事件。

代碼如下:

public final class JobExecutionEvent implements JobEvent {

    /**
     * 主鍵
     */
    private String id = UUID.randomUUID().toString();
    /**
     * 主機名稱
     */
    private String hostname = IpUtils.getHostName();
    /**
     * IP
     */
    private String ip = IpUtils.getIp();
    /**
     * 作業任務ID
     */
    private final String taskId;
    /**
     * 作業名字
     */
    private final String jobName;
    /**
     * 執行來源
     */
    private final ExecutionSource source;
    /**
     * 作業分片項
     */
    private final int shardingItem;
    /**
     * 開始時間
     */
    private Date startTime = new Date();
    /**
     * 結束時間
     */
    @Setter
    private Date completeTime;
    /**
     * 是否執行成功
     */
    @Setter
    private boolean success;
    /**
     * 執行失敗原因
     */
    @Setter
    private JobExecutionEventThrowable failureCause;
}           

複制

  • ExecutionSource,執行來源

    public enum ExecutionSource { /** * 普通觸發執行 */ NORMAL_TRIGGER, /** * 被錯過執行 */ MISFIRE, /** * 失效轉移執行 */ FAILOVER }

關系資料庫表

JOB_EXECUTION_LOG

結構如下:

CREATE TABLE `JOB_EXECUTION_LOG` (
  `id` varchar(40) COLLATE utf8_bin NOT NULL,
  `job_name` varchar(100) COLLATE utf8_bin NOT NULL,
  `task_id` varchar(255) COLLATE utf8_bin NOT NULL,
  `hostname` varchar(255) COLLATE utf8_bin NOT NULL,
  `ip` varchar(50) COLLATE utf8_bin NOT NULL,
  `sharding_item` int(11) NOT NULL,
  `execution_source` varchar(20) COLLATE utf8_bin NOT NULL,
  `failure_cause` varchar(4000) COLLATE utf8_bin DEFAULT NULL,
  `is_success` int(11) NOT NULL,
  `start_time` timestamp NULL DEFAULT NULL,
  `complete_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin           

複制

  • Elastic-Job-Lite 一次作業多作業分片項執行記錄如下( 打開大圖 ):
分布式作業系統 Elastic-Job-Lite 源碼分析 —— 作業事件追蹤1. 概述2. 作業事件總線3. 作業事件4. 作業監聽器

JobExecutionEvent 在 Elastic-Job-Lite 釋出時機:

```Java
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
   // 釋出執行事件(開始)
   if (shardingContexts.isAllowSendJobEvent()) {
       jobFacade.postJobExecutionEvent(startEvent);
   }
   JobExecutionEvent completeEvent;
   try {
       // 執行單個作業
       process(new ShardingContext(shardingContexts, item));
       // 釋出執行事件(成功)
       completeEvent = startEvent.executionSuccess();
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobExecutionEvent(completeEvent);
       }
   } catch (final Throwable cause) {
       // 釋出執行事件(失敗)
       completeEvent = startEvent.executionFailure(cause);
       jobFacade.postJobExecutionEvent(completeEvent);
       // ... 省略無關代碼
   }
}
```           

複制

JobExecutionEvent 在 Elastic-Job-Cloud 釋出時機:

和 Elastic-Job-Cloud 一緻。

3.3 作業事件資料庫存儲

JobEventRdbStorage,作業事件資料庫存儲。

建立 JobEventRdbStorage 代碼如下:

JobEventRdbStorage(final DataSource dataSource) throws SQLException {
   this.dataSource = dataSource;
   initTablesAndIndexes();
}

private void initTablesAndIndexes() throws SQLException {
   try (Connection conn = dataSource.getConnection()) {
       createJobExecutionTableAndIndexIfNeeded(conn);
       createJobStatusTraceTableAndIndexIfNeeded(conn);
       databaseType = DatabaseType.valueFrom(conn.getMetaData().getDatabaseProductName());
   }
}           

複制

  • 調用

    #createJobExecutionTableAndIndexIfNeeded(…)

    建立

    JOB_EXECUTION_LOG

    表和索引。
  • 調用

    #createJobStatusTraceTableAndIndexIfNeeded(…)

    建立

    JOB_STATUS_TRACE_LOG

    表和索引。

存儲 JobStatusTraceEvent 代碼如下:

// JobEventRdbStorage.java
boolean addJobStatusTraceEvent(final JobStatusTraceEvent jobStatusTraceEvent) {
   String originalTaskId = jobStatusTraceEvent.getOriginalTaskId();
   if (State.TASK_STAGING != jobStatusTraceEvent.getState()) {
       originalTaskId = getOriginalTaskId(jobStatusTraceEvent.getTaskId());
   }
   boolean result = false;
   String sql = "INSERT INTO `" + TABLE_JOB_STATUS_TRACE_LOG + "` (`id`, `job_name`, `original_task_id`, `task_id`, `slave_id`, `source`, `execution_type`, `sharding_item`,  " 
           + "`state`, `message`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
   // ... 省略你懂的代碼
}

private String getOriginalTaskId(final String taskId) {
   String sql = String.format("SELECT original_task_id FROM %s WHERE task_id = '%s' and state='%s'", TABLE_JOB_STATUS_TRACE_LOG, taskId, State.TASK_STAGING);
   // ... 省略你懂的代碼
   return original_task_id;
}           

複制

  • originalTaskId

    ,原任務作業ID。
    • Elastic-Job-Lite 暫未使用到該字段,存儲空串( `""` )。
    • Elastic-Job-Cloud 在作業失效轉移場景下使用該字段,存儲失效轉移的任務作業ID。

存儲 JobExecutionEvent 代碼如下:

// JobEventRdbStorage.java
boolean addJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
   if (null == jobExecutionEvent.getCompleteTime()) { // 作業分片項執行開始
       return insertJobExecutionEvent(jobExecutionEvent);
   } else {
       if (jobExecutionEvent.isSuccess()) { // 作業分片項執行完成(正常)
           return updateJobExecutionEventWhenSuccess(jobExecutionEvent);
       } else { // 作業分片項執行完成(異常)
           return updateJobExecutionEventFailure(jobExecutionEvent);
       }
   }
}           

複制

  • 作業分片項執行完成進行的是更新操作。

3.4 作業事件資料庫查詢

JobEventRdbSearch,作業事件資料庫查詢,提供給運維平台調用查詢資料。感興趣的同學點選連結直接檢視。

4. 作業監聽器

在上文我們看到,作業監聽器通過傳遞作業事件配置( JobEventConfiguration )給作業事件總線( JobEventBus ) 進行建立監聽器,并注冊監聽器到事件總線。

我們來看下 Elastic-Job 提供的基于關系資料庫的事件配置實作。

// JobEventConfiguration.java
public interface JobEventConfiguration extends JobEventIdentity {

    /**
     * 建立作業事件監聽器.
     * 
     * @return 作業事件監聽器.
     * @throws JobEventListenerConfigurationException 作業事件監聽器配置異常
     */
    JobEventListener createJobEventListener() throws JobEventListenerConfigurationException;
}

// JobEventRdbConfiguration.java
public final class JobEventRdbConfiguration extends JobEventRdbIdentity implements JobEventConfiguration, Serializable {

    private final transient DataSource dataSource;

    @Override
    public JobEventListener createJobEventListener() throws JobEventListenerConfigurationException {
        try {
            return new JobEventRdbListener(dataSource);
        } catch (final SQLException ex) {
            throw new JobEventListenerConfigurationException(ex);
        }
    }

}           

複制

  • JobEventRdbConfiguration,作業資料庫事件配置。調用

    #createJobEventListener()

    建立作業事件資料庫監聽器( JobEventRdbListener )。

JobEventRdbListener,作業事件資料庫監聽器。實作代碼如下:

// JobEventListener.java
public interface JobEventListener extends JobEventIdentity {

    /**
     * 作業執行事件監聽執行.
     *
     * @param jobExecutionEvent 作業執行事件
     */
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobExecutionEvent jobExecutionEvent);

    /**
     * 作業狀态痕迹事件監聽執行.
     *
     * @param jobStatusTraceEvent 作業狀态痕迹事件
     */
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobStatusTraceEvent jobStatusTraceEvent);
}

// JobEventRdbListener.java
public final class JobEventRdbListener extends JobEventRdbIdentity implements JobEventListener {

    private final JobEventRdbStorage repository;

    public JobEventRdbListener(final DataSource dataSource) throws SQLException {
        repository = new JobEventRdbStorage(dataSource);
    }

    @Override
    public void listen(final JobExecutionEvent executionEvent) {
        repository.addJobExecutionEvent(executionEvent);
    }

    @Override
    public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
        repository.addJobStatusTraceEvent(jobStatusTraceEvent);
    }
}           

複制

  • 通過 JobEventRdbStorage 存儲作業事件到關系型資料庫。

如何自定義作業監聽器?

有些同學可能希望使用 ES 或者其他資料庫存儲作業事件,這個時候可以通過實作 JobEventConfiguration、JobEventListener 進行拓展。

Elastic-Job-Cloud JobEventConfiguration 怎麼配置?

  • Elastic-Job-Cloud-Scheduler:從

    conf/elastic-job-cloud-scheduler.properties

    配置檔案讀取如下屬性,生成 JobEventConfiguration 配置對象。
    • event_trace_rdb_driver

    • event_trace_rdb_url

    • event_trace_rdb_username

    • event_trace_rdb_password

  • Elastic-Job-Cloud-Executor:通過接收到任務執行資訊裡讀取JobEventConfiguration,實作代碼如下:

    // TaskExecutor.java @Override public void registered(final ExecutorDriver executorDriver, final Protos.ExecutorInfo executorInfo, final Protos.FrameworkInfo frameworkInfo, final Protos.SlaveInfo slaveInfo) { if (!executorInfo.getData().isEmpty()) { Map<String, String> data = SerializationUtils.deserialize(executorInfo.getData().toByteArray()); BasicDataSource dataSource = new BasicDataSource(); dataSource.setDriverClassName(data.get("event_trace_rdb_driver")); dataSource.setUrl(data.get("event_trace_rdb_url")); dataSource.setPassword(data.get("event_trace_rdb_password")); dataSource.setUsername(data.get("event_trace_rdb_username")); jobEventBus = new JobEventBus(new JobEventRdbConfiguration(dataSource)); } }