天天看點

tsf定時任務遷移到xxl-job

tsf定時任務遷移到xxl-job

  • ​​1. 介紹​​
  • ​​2. 原理​​
  • ​​2.1 設計思想​​
  • ​​2.2 系統組成​​
  • ​​2.3 架構圖​​
  • ​​3. 遷移方案​​
  • ​​3.1 現狀​​
  • ​​3.2 遷移方案​​
  • ​​3.3 xxl-job 配置​​
  • ​​3.4 排程配置​​
  • ​​3.5 執行器配置​​
  • ​​4. 後續開發方案​​
  • ​​4.1 tsf 方式​​
  • ​​4.2 xxl-job 注解​​
  • ​​5. xxl-job 接入手冊​​
  • ​​5.1 maven 依賴​​
  • ​​5.2 配置類​​
  • ​​5.3 配置yml​​
  • ​​5.4 增加常量接口​​
  • ​​5.5 增加任務注冊​​
  • ​​6. 使用​​
  • ​​7. 總結​​

xxl-job 項目gitee:https://gitee.com/xuxueli0323/xxl-job

xxl-job 項目github:https://github.com/xuxueli/xxl-job/

xxl-job 項目文檔:https://www.xuxueli.com/xxl-job/

1. 介紹

XXL-JOB是一個分布式任務排程平台,其核心設計目标是開發迅速、學習簡單、輕量級、易擴充。現已開放源代碼并接入多家公司線上産品線,開箱即用。

xxl-job 是一個國人開發的架構,于15年開源啟動,到現在也有7年曆史,從最開始的基礎功能實作,到現在支援分布式,叢集,任務重試,分片等等。基本上已經很全面了,很能打了。

2. 原理

xxl-job 分為兩部分,xxl-job-admin,xxl-job-executor。排程中心,執行器。

  • 排程中心提供管理、排程功能
  • 執行器提供任務注冊,任務執行功能

執行器啟動後,掃描spring 容器中的執行類(​

​jobHandler​

​),然後将執行類注冊到排程中心。排程中心會儲存執行類和執行器的關系,當排程觸發某個執行類執行時,排程中心就會要求執行器執行指定的執行類,完成任務排程。

在xxl-job架構中,目前主流有兩種方案:一是獨立的執行器;二是內建的執行器。

獨立的執行器:

單獨部署spring boot 服務,專門用于執行定時任務,針對執行器服務,支援橫向動态擴充,類似于資源池,用于執行任務。

獨立執行器适合于定時任務較多,且同時存在任務執行時間差距較大,需要對定時任務做單獨資源管理的場景。

因為可能部分定時任務耗費資源較多,一定程度上會争奪資源。

案例:賬務子產品中賬單的生成,結算子產品中結算單生成。這兩種場景,都是指定時間的大并發場景,如果和其他業務放在一起,那麼在指定時間,觸發大并發的時候,會影響其他業務,是以抽離出來比較合适。

實作方案:

  1. 使用xxl-job的線上任務配置,将源碼直接在線上編輯器中執行。

    這種方式使用特别少,适合與系統依賴非常低的場景。

  2. 使用 spring 遠端調用的方式,在執行器中調用其他服務。

    這種方式使用普遍,比如定時任務中用到了其他服務的功能.

    這種是在執行器中開發任務邏輯。

  3. 完全使用 spring 遠端調用。

    這種方式與2的差別在于:在執行器外面開發任務邏輯,執行器隻是一個調用的作用。

內建的執行器:

不單獨部署 spring boot 服務,定時任務與業務接口部署在同一個服務中。

2.1 設計思想

将排程行為抽象形成“排程中心”公共平台,而平台自身并不承擔業務邏輯,“排程中心”負責發起排程請求。

将任務抽象成分散的JobHandler,交由“執行器”統一管理,“執行器”負責接收排程請求并執行對應的JobHandler中業務邏輯。

是以,“排程”和“任務”兩部分可以互相解耦,提高系統整體穩定性和擴充性;

2.2 系統組成

  • 排程子產品(排程中心):

    負責管理排程資訊,按照排程配置發出排程請求,自身不承擔業務代碼。排程系統與任務解耦,提高了系統可用性和穩定性,同時排程系統性能不再受限于任務子產品;

    支援可視化、簡單且動态的管理排程資訊,包括任務建立,更新,删除,GLUE開發和任務報警等,所有上述操作都會實時生效,同時支援監控排程結果以及執行日志,支援執行器Failover。

  • 執行子產品(執行器):

    負責接收排程請求并執行任務邏輯。任務子產品專注于任務的執行等操作,開發和維護更加簡單和高效;

    接收“排程中心”的執行請求、終止請求和日志請求等。

2.3 架構圖

tsf定時任務遷移到xxl-job
原理更多資料請看:https://www.xuxueli.com/xxl-job/#%E4%BA%94%E3%80%81%E6%80%BB%E4%BD%93%E8%AE%BE%E8%AE%A1

3. 遷移方案

3.1 現狀

現在我們使用的是 tsf 的任務架構。

tsf 的任務要求我們實作 ​

​com.tencent.cloud.task.sdk.client.spi.ExecutableTask​

​ 接口。

源碼如下:

package com.tencent.cloud.task.sdk.client.spi;

import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;

public interface ExecutableTask {
    ProcessResult execute(ExecutableTaskData var1);
}      

很簡單,就是實作一個​

​execute​

​方法。

其參數是封裝了一些任務資訊,包括任務排程,任務重試,任務逾時,任務參數等資訊。

傳回值封裝了任務執行結果,成功失敗,成功失敗資訊,以及任務中繼資料資訊。

3.2 遷移方案

因為我們切換了新的任務排程架構,結合 tsf 的任務現狀,遷移方案設計如下:

首先增加一個 spring 上下文容器屬性操作,掃描 srping 容器中 ​

​com.tencent.cloud.task.sdk.client.spi.ExecutableTask​

​ 接口的實作類,然後将實作類注冊到xxl-job。

在注冊到xxl-job的時候,建立xxl-job的執行類,做 tsf 的任務的實作代理。

具體源碼如下:

import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * xxl-job registry
 *
 * @author 
 * @date 2022/10/17 15:27
 **/
@Slf4j
@Component
public class XxlJobRegistry implements ApplicationContextAware {

    @Value("${spring.application.name:}")
    private String appName;

    @Override
    public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
        // 1. 擷取全部的 tsf 的任務
        Map<String, ExecutableTask> beansOfType = applicationContext.getBeansOfType(ExecutableTask.class);
        beansOfType.forEach((key, value) -> {
            // 2. 注冊到 xxl-job
            // name = appName # className # execute 
            // name = appName # 任務 method 全引用路徑
            // 使用 method 的全引用路徑,防止 beanName 重複的任務
            // 使用 appName 字首,防止跨項目 method 全引用路徑重複
            // 使用 匿名實作類 做 tsf 到 xxl-job 的轉換
            XxlJobExecutor.registJobHandler(appName + "#" + value.getClass().getName() + "#" + "execute", new IJobHandler() {
                @Override
                public void execute () throws Exception {
                    // XxlJobHelper.log 是 xxl-job 的 日志列印方式
                    XxlJobHelper.log(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
                    // log.info 是 tsf 的日志列印方式,也是我們統一日志內建方式
                    log.info(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
                    // 構造 tsf 任務觸發請求參數
                    ExecutableTaskData executableTaskData = new ExecutableTaskData();
                    // 将 xxl-job 的任務參數 轉發 到 tsf 任務請求參數中
                    // 相容現有的 tsf 任務,同時支援後續任務開發繼續使用 tsf 模式
                    executableTaskData.setTaskArgument(XxlJobHelper.getJobParam());
                    // 調用 stf 任務
                    ProcessResult execute = value.execute(executableTaskData);
                    // 對 tsf 任務做結果轉換
                    if (ProcessResultCode.SUCCESS == execute.getResultCode()) {
                        XxlJobHelper.handleSuccess(execute.getResultMsg());
                    } else {
                        XxlJobHelper.handleFail(execute.getResultMsg());
                    }
                    XxlJobHelper.log("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
                    log.info("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
                }
            });
            log.info(" xxl-job registry {} success!", key);
        });
    }

}      

使用這種方式,可以同時支援 tsf 和 xxl-job ,一方面是 xxl-job 試用階段的雙架構支援,另一方面是 現有 任務的 xxl-job 支援。

3.3 xxl-job 配置

xxl-job的配置主要是 xxl-job 執行器的配置。

### 排程中心部署根位址 [選填]:如排程中心叢集部署存在多個位址則用逗号分隔。執行器将會使用該位址進行"執行器心跳注冊"和"任務結果回調";為空則關閉自動注冊;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 執行器通訊TOKEN [選填]:非空時啟用;
xxl.job.accessToken=
### 執行器AppName [選填]:執行器心跳注冊分組依據;為空則關閉自動注冊
xxl.job.executor.appname=xxl-job-executor-sample
### 執行器注冊 [選填]:優先使用該配置作為注冊位址,為空時使用内嵌服務 ”IP:PORT“ 作為注冊位址。進而更靈活的支援容器類型執行器動态IP和動态映射端口問題。
xxl.job.executor.address=
### 執行器IP [選填]:預設為空表示自動擷取IP,多網卡時可手動設定指定IP,該IP不會綁定Host僅作為通訊實用;位址資訊用于 "執行器注冊" 和 "排程中心請求并觸發任務";
xxl.job.executor.ip=
### 執行器端口号 [選填]:小于等于0則自動擷取;預設端口為9999,單機部署多個執行器時,注意要配置不同執行器端口;
xxl.job.executor.port=9999
### 執行器運作日志檔案存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權限;為空則使用預設路徑;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 執行器日志檔案儲存天數 [選填] : 過期日志自動清理, 限制值大于等于3時生效; 否則, 如-1, 關閉自動清理功能;
xxl.job.executor.logretentiondays=30      

對應的配置類:

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author jiayongqi
 * @date 2022/10/17 14:27
 **/
@Slf4j
@RefreshScope
@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses:}")
    private String adminAddresses;
    @Value("${xxl.job.executor.appname:}")
    private String appName;
    @Value("${xxl.job.accessToken:}")
    private String accessToken;
    @Value("${xxl.job.executor.address:}")
    private String address;
    @Value("${xxl.job.executor.ip:}")
    private String ip;
    @Value("${xxl.job.executor.port:0}")
    private int port;
    @Value("${xxl.job.executor.logpath:}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays:-1}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor () {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setAppname(appName);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        log.info("xxl-job config init : {}", xxlJobSpringExecutor);
        return xxlJobSpringExecutor;
    }

}      

3.4 排程配置

首先登陸到xxl-job的排程平台

tsf定時任務遷移到xxl-job

現有任務:

現有任務:​

​com.boss.communal.xxl.job.task.TsfTestTask​

tsf定時任務遷移到xxl-job

根據規則​

​appName # className # methodName​

​​拼接出Task的唯一辨別​

​boss-communal#com.boss.communal.xxl.job.task.TsfTestTask#execute​

然後在任務管理中建立任務

tsf定時任務遷移到xxl-job

建立成功後,就可以執行了

tsf定時任務遷移到xxl-job

如果有參數,那麼就設定參數,和 tsf 的任務管理控制台任務參數相同,需要和任務邏輯中參數解析的格式保持一緻

tsf定時任務遷移到xxl-job

然後就可以檢視執行日志了

tsf定時任務遷移到xxl-job

如下

tsf定時任務遷移到xxl-job

3.5 執行器配置

執行器隻需要配置一次即可,與項目同步即可,與​

​applicationName​

​相同即可

tsf定時任務遷移到xxl-job

spring 服務要注冊到那個執行器,由配置項配置

tsf定時任務遷移到xxl-job

4. 後續開發方案

服務同時支援 tsf 和 xxl-job 後,後續開發定時任務,有兩種方式:tsf 方式、xxl-job 注解。

4.1 tsf 方式

繼續實作​

​com.tencent.cloud.task.sdk.client.spi.ExecutableTask​

​接口,别忘記将接口的實作類讓spring 管理。

例子:

tsf定時任務遷移到xxl-job

4.2 xxl-job 注解

對定時任務的邏輯上增加​

​@XxlJob​

​​的注解即可,​

​value​

​​就是我們在新增任務時的唯一辨別。請注意規則​

​appName # className # methodName​

為了友善統一管理已将 ​

​appName​

​ 封裝為接口常量

tsf定時任務遷移到xxl-job

後面寫定時任務的時候,實作接口即可(接口沒有需要實作的方法,隻是提供常量)

然後在​

​@XxlJob​

​注解中使用即可

tsf定時任務遷移到xxl-job

5. xxl-job 接入手冊

5.1 maven 依賴

在需要接入的項目的最外層 ​

​pom.xml​

​​ 中的​

​dependencies​

​标簽下增加依賴

<dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>${xxl-job.version}</version>
        </dependency>      

别忘記在​

​properties​

​标簽下增加版本号

<xxl-job.version>2.3.1</xxl-job.version>      

重新整理maven依賴

5.2 配置類

在配置包中,增加配置類(因為xxl-job需要指定執行器,加上xxl-job需要負責注冊和監聽任務執行,還是比較耗費資源的,是以,不建議将配置類和依賴內建到​

​common​

​中,後期如果全部接入後,可以考慮)

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author 
 * @date 2022/10/17 14:27
 **/
@Slf4j
@RefreshScope
@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses:}")
    private String adminAddresses;
    @Value("${xxl.job.executor.appname:}")
    private String appName;
    @Value("${xxl.job.accessToken:}")
    private String accessToken;
    @Value("${xxl.job.executor.address:}")
    private String address;
    @Value("${xxl.job.executor.ip:}")
    private String ip;
    @Value("${xxl.job.executor.port:0}")
    private int port;
    @Value("${xxl.job.executor.logpath:}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays:-1}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor () {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setAppname(appName);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        log.info("xxl-job config init : {}", xxlJobSpringExecutor);
        return xxlJobSpringExecutor;
    }

}      

5.3 配置yml

在yml中增加配置

xxl:
  job:
    accessToken: test
    admin:
      addresses: http://slaver:18080/xxl-job-admin
    executor:
      appname: boss-communal
#      address: http://master
      ip: 172.16.10.251
      port: 9999 # 預設 9999
      logpath: ./xxl-job/
      logretentiondays: -1      

應用程式隻能擷取到k8s容器内的ip,在通路的時候需要注意,如果排程中心與執行器處于同一個k8s網絡中,那麼使用ip可以,否則就需要使用 address。

需要保證 執行器 和 排程器 之間能互相通路。

5.4 增加常量接口

在配置包或者工具包中增加常量接口

/**
 * @author 
 * @date 2022/10/20 10:47
 **/
public interface ApplicationName {

    String APP_NAME = "boss-communal";

}      

5.5 增加任務注冊

在應用初始化的包中增加xxl-job任務注冊類

import com.boss.communal.xxl.job.common.ApplicationName;
import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * xxl-job registry
 *
 * @author 
 * @date 2022/10/17 15:27
 **/
@Slf4j
@Component
public class XxlJobRegistry implements ApplicationContextAware, ApplicationName {
    @Override
    public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
        // 1. 擷取全部的 tsf 的任務
        Map<String, ExecutableTask> beansOfType = applicationContext.getBeansOfType(ExecutableTask.class);
        beansOfType.forEach((key, value) -> {
            // 2. 注冊到 xxl-job
            // name = appName # className # execute
            // 使用 method 的全引用路徑,防止 beanName 重複的任務
            // 使用 appName 字首,防止跨項目 method 全引用路徑重複
            // name = appName # 任務 method 全引用路徑
            XxlJobExecutor.registJobHandler(APP_NAME + "#" + value.getClass().getName() + "#" + "execute", new IJobHandler() {
                @Override
                public void execute () throws Exception {
                    // XxlJobHelper.log 是 xxl-job 的 日志列印方式
                    XxlJobHelper.log(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
                    // log.info 是 tsf 的日志列印方式,也是我們統一日志內建方式
                    log.info(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
                    // 構造 tsf 任務觸發請求參數
                    ExecutableTaskData executableTaskData = new ExecutableTaskData();
                    // 将 xxl-job 的任務參數 轉發 到 tsf 任務請求參數中
                    // 相容現有的 tsf 任務,同時支援後續任務開發繼續使用 tsf 模式
                    executableTaskData.setTaskArgument(XxlJobHelper.getJobParam());
                    // 調用 stf 任務
                    ProcessResult execute = value.execute(executableTaskData);
                    // 對 tsf 任務做結果轉換
                    if (ProcessResultCode.SUCCESS == execute.getResultCode()) {
                        XxlJobHelper.handleSuccess(execute.getResultMsg());
                    } else {
                        XxlJobHelper.handleFail(execute.getResultMsg());
                    }
                    XxlJobHelper.log("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
                    log.info("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
                }
            });
            log.info(" xxl-job registry {} success!", key);
        });
    }

}      

6. 使用

要使用xxl-job,就需要先将執行器注冊到排程中心。

如果執行器還未在排程中心建立,需要先建立,然後在建立基于執行器的任務,最後配置定時任務或者手動任務。

具體使用教程見文檔:https://www.xuxueli.com/xxl-job/#%E5%9B%9B%E3%80%81%E6%93%8D%E4%BD%9C%E6%8C%87%E5%8D%97

7. 總結

繼續閱讀