tsf定時任務遷移到xxl-job
- 1. 介紹
- 2. 原理
- 2.1 設計思想
- 2.2 系統組成
- 2.3 架構圖
- 3. 遷移方案
- 3.1 現狀
- 3.2 遷移方案
- 3.3 xxl-job 配置
- 3.4 排程配置
- 3.5 執行器配置
- 4. 後續開發方案
- 4.1 tsf 方式
- 4.2 xxl-job 注解
- 5. xxl-job 接入手冊
- 5.1 maven 依賴
- 5.2 配置類
- 5.3 配置yml
- 5.4 增加常量接口
- 5.5 增加任務注冊
- 6. 使用
- 7. 總結
xxl-job 項目gitee:https://gitee.com/xuxueli0323/xxl-job
xxl-job 項目github:https://github.com/xuxueli/xxl-job/
xxl-job 項目文檔:https://www.xuxueli.com/xxl-job/
1. 介紹
XXL-JOB是一個分布式任務排程平台,其核心設計目标是開發迅速、學習簡單、輕量級、易擴充。現已開放源代碼并接入多家公司線上産品線,開箱即用。
xxl-job 是一個國人開發的架構,于15年開源啟動,到現在也有7年曆史,從最開始的基礎功能實作,到現在支援分布式,叢集,任務重試,分片等等。基本上已經很全面了,很能打了。
2. 原理
xxl-job 分為兩部分,xxl-job-admin,xxl-job-executor。排程中心,執行器。
- 排程中心提供管理、排程功能
- 執行器提供任務注冊,任務執行功能
執行器啟動後,掃描spring 容器中的執行類(
jobHandler
),然後将執行類注冊到排程中心。排程中心會儲存執行類和執行器的關系,當排程觸發某個執行類執行時,排程中心就會要求執行器執行指定的執行類,完成任務排程。
在xxl-job架構中,目前主流有兩種方案:一是獨立的執行器;二是內建的執行器。
獨立的執行器:
單獨部署spring boot 服務,專門用于執行定時任務,針對執行器服務,支援橫向動态擴充,類似于資源池,用于執行任務。
獨立執行器适合于定時任務較多,且同時存在任務執行時間差距較大,需要對定時任務做單獨資源管理的場景。
因為可能部分定時任務耗費資源較多,一定程度上會争奪資源。
案例:賬務子產品中賬單的生成,結算子產品中結算單生成。這兩種場景,都是指定時間的大并發場景,如果和其他業務放在一起,那麼在指定時間,觸發大并發的時候,會影響其他業務,是以抽離出來比較合适。
實作方案:
-
使用xxl-job的線上任務配置,将源碼直接在線上編輯器中執行。
這種方式使用特别少,适合與系統依賴非常低的場景。
-
使用 spring 遠端調用的方式,在執行器中調用其他服務。
這種方式使用普遍,比如定時任務中用到了其他服務的功能.
這種是在執行器中開發任務邏輯。
-
完全使用 spring 遠端調用。
這種方式與2的差別在于:在執行器外面開發任務邏輯,執行器隻是一個調用的作用。
內建的執行器:
不單獨部署 spring boot 服務,定時任務與業務接口部署在同一個服務中。
2.1 設計思想
将排程行為抽象形成“排程中心”公共平台,而平台自身并不承擔業務邏輯,“排程中心”負責發起排程請求。
将任務抽象成分散的JobHandler,交由“執行器”統一管理,“執行器”負責接收排程請求并執行對應的JobHandler中業務邏輯。
是以,“排程”和“任務”兩部分可以互相解耦,提高系統整體穩定性和擴充性;
2.2 系統組成
-
排程子產品(排程中心):
負責管理排程資訊,按照排程配置發出排程請求,自身不承擔業務代碼。排程系統與任務解耦,提高了系統可用性和穩定性,同時排程系統性能不再受限于任務子產品;
支援可視化、簡單且動态的管理排程資訊,包括任務建立,更新,删除,GLUE開發和任務報警等,所有上述操作都會實時生效,同時支援監控排程結果以及執行日志,支援執行器Failover。
-
執行子產品(執行器):
負責接收排程請求并執行任務邏輯。任務子產品專注于任務的執行等操作,開發和維護更加簡單和高效;
接收“排程中心”的執行請求、終止請求和日志請求等。
2.3 架構圖

原理更多資料請看:https://www.xuxueli.com/xxl-job/#%E4%BA%94%E3%80%81%E6%80%BB%E4%BD%93%E8%AE%BE%E8%AE%A1
3. 遷移方案
3.1 現狀
現在我們使用的是 tsf 的任務架構。
tsf 的任務要求我們實作
com.tencent.cloud.task.sdk.client.spi.ExecutableTask
接口。
源碼如下:
package com.tencent.cloud.task.sdk.client.spi;
import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
public interface ExecutableTask {
ProcessResult execute(ExecutableTaskData var1);
}
很簡單,就是實作一個
execute
方法。
其參數是封裝了一些任務資訊,包括任務排程,任務重試,任務逾時,任務參數等資訊。
傳回值封裝了任務執行結果,成功失敗,成功失敗資訊,以及任務中繼資料資訊。
3.2 遷移方案
因為我們切換了新的任務排程架構,結合 tsf 的任務現狀,遷移方案設計如下:
首先增加一個 spring 上下文容器屬性操作,掃描 srping 容器中
com.tencent.cloud.task.sdk.client.spi.ExecutableTask
接口的實作類,然後将實作類注冊到xxl-job。
在注冊到xxl-job的時候,建立xxl-job的執行類,做 tsf 的任務的實作代理。
具體源碼如下:
import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* xxl-job registry
*
* @author
* @date 2022/10/17 15:27
**/
@Slf4j
@Component
public class XxlJobRegistry implements ApplicationContextAware {
@Value("${spring.application.name:}")
private String appName;
@Override
public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
// 1. 擷取全部的 tsf 的任務
Map<String, ExecutableTask> beansOfType = applicationContext.getBeansOfType(ExecutableTask.class);
beansOfType.forEach((key, value) -> {
// 2. 注冊到 xxl-job
// name = appName # className # execute
// name = appName # 任務 method 全引用路徑
// 使用 method 的全引用路徑,防止 beanName 重複的任務
// 使用 appName 字首,防止跨項目 method 全引用路徑重複
// 使用 匿名實作類 做 tsf 到 xxl-job 的轉換
XxlJobExecutor.registJobHandler(appName + "#" + value.getClass().getName() + "#" + "execute", new IJobHandler() {
@Override
public void execute () throws Exception {
// XxlJobHelper.log 是 xxl-job 的 日志列印方式
XxlJobHelper.log(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// log.info 是 tsf 的日志列印方式,也是我們統一日志內建方式
log.info(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// 構造 tsf 任務觸發請求參數
ExecutableTaskData executableTaskData = new ExecutableTaskData();
// 将 xxl-job 的任務參數 轉發 到 tsf 任務請求參數中
// 相容現有的 tsf 任務,同時支援後續任務開發繼續使用 tsf 模式
executableTaskData.setTaskArgument(XxlJobHelper.getJobParam());
// 調用 stf 任務
ProcessResult execute = value.execute(executableTaskData);
// 對 tsf 任務做結果轉換
if (ProcessResultCode.SUCCESS == execute.getResultCode()) {
XxlJobHelper.handleSuccess(execute.getResultMsg());
} else {
XxlJobHelper.handleFail(execute.getResultMsg());
}
XxlJobHelper.log("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
log.info("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
}
});
log.info(" xxl-job registry {} success!", key);
});
}
}
使用這種方式,可以同時支援 tsf 和 xxl-job ,一方面是 xxl-job 試用階段的雙架構支援,另一方面是 現有 任務的 xxl-job 支援。
3.3 xxl-job 配置
xxl-job的配置主要是 xxl-job 執行器的配置。
### 排程中心部署根位址 [選填]:如排程中心叢集部署存在多個位址則用逗号分隔。執行器将會使用該位址進行"執行器心跳注冊"和"任務結果回調";為空則關閉自動注冊;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 執行器通訊TOKEN [選填]:非空時啟用;
xxl.job.accessToken=
### 執行器AppName [選填]:執行器心跳注冊分組依據;為空則關閉自動注冊
xxl.job.executor.appname=xxl-job-executor-sample
### 執行器注冊 [選填]:優先使用該配置作為注冊位址,為空時使用内嵌服務 ”IP:PORT“ 作為注冊位址。進而更靈活的支援容器類型執行器動态IP和動态映射端口問題。
xxl.job.executor.address=
### 執行器IP [選填]:預設為空表示自動擷取IP,多網卡時可手動設定指定IP,該IP不會綁定Host僅作為通訊實用;位址資訊用于 "執行器注冊" 和 "排程中心請求并觸發任務";
xxl.job.executor.ip=
### 執行器端口号 [選填]:小于等于0則自動擷取;預設端口為9999,單機部署多個執行器時,注意要配置不同執行器端口;
xxl.job.executor.port=9999
### 執行器運作日志檔案存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權限;為空則使用預設路徑;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 執行器日志檔案儲存天數 [選填] : 過期日志自動清理, 限制值大于等于3時生效; 否則, 如-1, 關閉自動清理功能;
xxl.job.executor.logretentiondays=30
對應的配置類:
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author jiayongqi
* @date 2022/10/17 14:27
**/
@Slf4j
@RefreshScope
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses:}")
private String adminAddresses;
@Value("${xxl.job.executor.appname:}")
private String appName;
@Value("${xxl.job.accessToken:}")
private String accessToken;
@Value("${xxl.job.executor.address:}")
private String address;
@Value("${xxl.job.executor.ip:}")
private String ip;
@Value("${xxl.job.executor.port:0}")
private int port;
@Value("${xxl.job.executor.logpath:}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays:-1}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor () {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
log.info("xxl-job config init : {}", xxlJobSpringExecutor);
return xxlJobSpringExecutor;
}
}
3.4 排程配置
首先登陸到xxl-job的排程平台
現有任務:
現有任務:
com.boss.communal.xxl.job.task.TsfTestTask
根據規則
appName # className # methodName
拼接出Task的唯一辨別
boss-communal#com.boss.communal.xxl.job.task.TsfTestTask#execute
然後在任務管理中建立任務
建立成功後,就可以執行了
如果有參數,那麼就設定參數,和 tsf 的任務管理控制台任務參數相同,需要和任務邏輯中參數解析的格式保持一緻
然後就可以檢視執行日志了
如下
3.5 執行器配置
執行器隻需要配置一次即可,與項目同步即可,與
applicationName
相同即可
spring 服務要注冊到那個執行器,由配置項配置
4. 後續開發方案
服務同時支援 tsf 和 xxl-job 後,後續開發定時任務,有兩種方式:tsf 方式、xxl-job 注解。
4.1 tsf 方式
繼續實作
com.tencent.cloud.task.sdk.client.spi.ExecutableTask
接口,别忘記将接口的實作類讓spring 管理。
例子:
4.2 xxl-job 注解
對定時任務的邏輯上增加
@XxlJob
的注解即可,
value
就是我們在新增任務時的唯一辨別。請注意規則
appName # className # methodName
為了友善統一管理已将
appName
封裝為接口常量
後面寫定時任務的時候,實作接口即可(接口沒有需要實作的方法,隻是提供常量)
然後在
@XxlJob
注解中使用即可
5. xxl-job 接入手冊
5.1 maven 依賴
在需要接入的項目的最外層
pom.xml
中的
dependencies
标簽下增加依賴
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
别忘記在
properties
标簽下增加版本号
<xxl-job.version>2.3.1</xxl-job.version>
重新整理maven依賴
5.2 配置類
在配置包中,增加配置類(因為xxl-job需要指定執行器,加上xxl-job需要負責注冊和監聽任務執行,還是比較耗費資源的,是以,不建議将配置類和依賴內建到
common
中,後期如果全部接入後,可以考慮)
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author
* @date 2022/10/17 14:27
**/
@Slf4j
@RefreshScope
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses:}")
private String adminAddresses;
@Value("${xxl.job.executor.appname:}")
private String appName;
@Value("${xxl.job.accessToken:}")
private String accessToken;
@Value("${xxl.job.executor.address:}")
private String address;
@Value("${xxl.job.executor.ip:}")
private String ip;
@Value("${xxl.job.executor.port:0}")
private int port;
@Value("${xxl.job.executor.logpath:}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays:-1}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor () {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
log.info("xxl-job config init : {}", xxlJobSpringExecutor);
return xxlJobSpringExecutor;
}
}
5.3 配置yml
在yml中增加配置
xxl:
job:
accessToken: test
admin:
addresses: http://slaver:18080/xxl-job-admin
executor:
appname: boss-communal
# address: http://master
ip: 172.16.10.251
port: 9999 # 預設 9999
logpath: ./xxl-job/
logretentiondays: -1
應用程式隻能擷取到k8s容器内的ip,在通路的時候需要注意,如果排程中心與執行器處于同一個k8s網絡中,那麼使用ip可以,否則就需要使用 address。
需要保證 執行器 和 排程器 之間能互相通路。
5.4 增加常量接口
在配置包或者工具包中增加常量接口
/**
* @author
* @date 2022/10/20 10:47
**/
public interface ApplicationName {
String APP_NAME = "boss-communal";
}
5.5 增加任務注冊
在應用初始化的包中增加xxl-job任務注冊類
import com.boss.communal.xxl.job.common.ApplicationName;
import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* xxl-job registry
*
* @author
* @date 2022/10/17 15:27
**/
@Slf4j
@Component
public class XxlJobRegistry implements ApplicationContextAware, ApplicationName {
@Override
public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
// 1. 擷取全部的 tsf 的任務
Map<String, ExecutableTask> beansOfType = applicationContext.getBeansOfType(ExecutableTask.class);
beansOfType.forEach((key, value) -> {
// 2. 注冊到 xxl-job
// name = appName # className # execute
// 使用 method 的全引用路徑,防止 beanName 重複的任務
// 使用 appName 字首,防止跨項目 method 全引用路徑重複
// name = appName # 任務 method 全引用路徑
XxlJobExecutor.registJobHandler(APP_NAME + "#" + value.getClass().getName() + "#" + "execute", new IJobHandler() {
@Override
public void execute () throws Exception {
// XxlJobHelper.log 是 xxl-job 的 日志列印方式
XxlJobHelper.log(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// log.info 是 tsf 的日志列印方式,也是我們統一日志內建方式
log.info(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// 構造 tsf 任務觸發請求參數
ExecutableTaskData executableTaskData = new ExecutableTaskData();
// 将 xxl-job 的任務參數 轉發 到 tsf 任務請求參數中
// 相容現有的 tsf 任務,同時支援後續任務開發繼續使用 tsf 模式
executableTaskData.setTaskArgument(XxlJobHelper.getJobParam());
// 調用 stf 任務
ProcessResult execute = value.execute(executableTaskData);
// 對 tsf 任務做結果轉換
if (ProcessResultCode.SUCCESS == execute.getResultCode()) {
XxlJobHelper.handleSuccess(execute.getResultMsg());
} else {
XxlJobHelper.handleFail(execute.getResultMsg());
}
XxlJobHelper.log("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
log.info("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
}
});
log.info(" xxl-job registry {} success!", key);
});
}
}
6. 使用
要使用xxl-job,就需要先将執行器注冊到排程中心。
如果執行器還未在排程中心建立,需要先建立,然後在建立基于執行器的任務,最後配置定時任務或者手動任務。
具體使用教程見文檔:https://www.xuxueli.com/xxl-job/#%E5%9B%9B%E3%80%81%E6%93%8D%E4%BD%9C%E6%8C%87%E5%8D%97