本文首發于 vivo網際網路技術 微信公衆号
連結:
https://mp.weixin.qq.com/s/l4vuYpNRjKxQRkRTDhyg2Q 作者:陳王榮
分布式任務排程架構幾乎是每個大型應用必備的工具,本文介紹了任務排程架構使用的需求背景和痛點,對業界普遍使用的開源分布式任務排程架構的使用進行了探究實踐,并分析了這幾種架構的優劣勢和對自身業務的思考。
一、業務背景
1.1 為什麼需要使用定時任務排程
(1)時間驅動處理場景:整點發送優惠券,每天更新收益,每天重新整理标簽資料和人群資料。
(2)批量處理資料:按月批量統計報表資料,批量更新短信狀态,實時性要求不高。
(3)異步執行解耦:活動狀态重新整理,異步執行離線查詢,與内部邏輯解耦。
1.2 使用需求和痛點
(1)任務執行監控告警能力。
(2)任務可靈活動态配置,無需重新開機。
(3)業務透明,低耦合,配置精簡,開發友善。
(4)易測試。
(5)高可用,無單點故障。
(6)任務不可重複執行,防止邏輯異常。
(7)大任務的分發并行處理能力。
二、開源架構實踐與探索
2.1 Java 原生 Timer 和ScheduledExecutorService
2.1.1 Timer使用

Timer缺陷:
- Timer底層是使用單線程來處理多個Timer任務,這意味着所有任務實際上都是串行執行,前一個任務的延遲會影響到之後的任務的執行。
- 由于單線程的緣故,一旦某個定時任務在運作時,産生未處理的異常,那麼不僅目前這個線程會停止,所有的定時任務都會停止。
- Timer任務執行是依賴于系統絕對時間,系統時間變化會導緻執行計劃的變更。
由于上述缺陷,盡量不要使用Timer, idea中也會明确提示,使用ScheduledThreadPoolExecutor替代Timer 。
2.1.2 ScheduledExecutorService使用
ScheduledExecutorService對于Timer的缺陷進行了修補,首先ScheduledExecutorService内部實作是ScheduledThreadPool線程池,可以支援多個任務并發執行。
對于某一個線程執行的任務出現異常,也會處理,不會影響其他線程任務的執行,另外ScheduledExecutorService是基于時間間隔的延遲,執行不會由于系統時間的改變發生變化。
當然,ScheduledExecutorService也有自己的局限性:隻能根據任務的延遲來進行排程,無法滿足基于絕對時間和月曆排程的需求。
2.2 Spring Task
2.2.1 Spring Task 使用
spring task 是spring自主開發的輕量級定時任務架構,不需要依賴其他額外的包,配置較為簡單。
此處使用注解配置
2.2.2 Spring Task缺陷
Spring Task 本身不支援持久化,也沒有推出官方的分布式叢集模式,隻能靠開發者在業務應用中自己手動擴充實作,無法滿足可視化,易配置的需求。
2.3 永遠經典的 Quartz
2.3.1 基本介紹
Quartz架構是Java領域最著名的開源任務排程工具,也是目前事實上的定時任務标準,幾乎全部的開源定時任務架構都是基于Quartz核心排程建構而成。
2.3.2 原了解析
核心元件和架構
關鍵概念
(1)Scheduler:任務排程器,是執行任務排程的控制器。本質上是一個計劃排程容器,注冊了全部Trigger和對應的JobDetail, 使用線程池作為任務運作的基礎元件,提高任務執行效率。
(2)Trigger:觸發器,用于定義任務排程的時間規則,告訴任務排程器什麼時候觸發任務,其中CronTrigger是基于cron表達式建構的功能強大的觸發器。
(3)Calendar:月曆特定時間點的集合。一個trigger可以包含多個Calendar,可用于排除或包含某些時間點。
(4)JobDetail:是一個可執行的工作,用來描述Job實作類及其它相關的靜态資訊,如Job的名稱、監聽器等相關資訊。
(5)Job:任務執行接口,隻有一個execute方法,用于執行真正的業務邏輯。
(6)JobStore:任務存儲方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存儲在JVM的記憶體中,有丢失和數量受限的風險,JDBCJobStore是将任務資訊持久化到資料庫中,支援叢集。
2.3.3 實踐說明
(1)關于Quartz的基本使用
- 可參考Quartz官方文檔和網上部落格實踐教程。
(2)業務使用要滿足動态修改和重新開機不丢失, 一般需要使用資料庫進行儲存。
- Quartz本身支援JDBCJobStore,但是其配置的資料表比較多,官方推薦配置可參照官方文檔,超過10張表,業務使用比較重。
- 在使用的時候隻需要存在基本trigger配置和對應任務以及相關執行日志的表即可滿足絕大部分需求。
(3)元件化
- 将quartz動态任務配置資訊持久化到資料庫,将資料操作包裝成基本jar包,供項目之間使用,引用項目隻需要引入jar包依賴和配置對應的資料表,使用時就可以對Quartz配置透明。
(4)擴充
- 叢集模式
通過故障轉移和負載均衡實作了任務的高可用性,通過資料庫的鎖機制來確定任務執行的唯一性,但是叢集特性僅僅隻是用來HA,節點數量的增加并不會提升單個任務的執行效率,不能實作水準擴充。
- Quartz插件
可以對特定需要進行擴充,比如增加觸發器和任務執行日志,任務依賴串行處理場景,可參考:[quartz插件——實作任務之間的串行排程](https://www.cnblogs.com/surprizeFuture/articles/quartzPlugin.html)
2.3.4 缺陷和不足
(1)需要把任務資訊持久化到業務資料表,和業務有耦合。
(2)排程邏輯和執行邏輯并存于同一個項目中,在機器性能固定的情況下,業務和排程之間不可避免地會互相影響。
(3)quartz叢集模式下,是通過資料庫獨占鎖來唯一擷取任務,任務執行并沒有實作完善的負載均衡機制。
2.4 輕量級神器 XXL-JOB
2.4.1 基本介紹
XXL-JOB是一個輕量級分布式任務排程平台,主打特點是平台化,易部署,開發迅速、學習簡單、輕量級、易擴充,代碼仍在持續更新中。
“排程中心”是任務排程控制台,平台自身并不承擔業務邏輯,隻是負責任務的統一管理和排程執行,并且提供任務管理平台, “執行器” 負責接收“排程中心”的排程并執行,可直接部署執行器,也可以将執行器內建到現有業務項目中。 通過将任務的排程控制和任務的執行解耦,業務使用隻需要關注業務邏輯的開發。
主要提供了任務的動态配置管理、任務監控和統計報表以及排程日志幾大功能子產品,支援多種運作模式和路由政策,可基于對應執行器機器叢集數量進行簡單分片資料處理。
2.4.2 原了解析
2.1.0版本前核心排程子產品都是基于quartz架構,2.1.0版本開始自研排程元件,移除quartz依賴 ,使用時間輪排程。
2.4.3 實踐說明
詳細配置和介紹參考
官方文檔。
2.4.3.1 demo使用:
示例1:實作簡單任務配置,隻需要繼承IJobHandler 抽象類,并聲明注解
@JobHandler(value="offlineTaskJobHandler") ,實作業務邏輯即可。(注:此次引入了dubbo,後文介紹)。
@JobHandler(value="offlineTaskJobHandler")
@Component
public class OfflineTaskJobHandler extends IJobHandler {
@Reference(check = false,version = "cms-dev",group="cms-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log(" offlineTaskJobHandler start.");
try {
offlineTaskExecutorFacade.executeOfflineTask();
} catch (Exception e) {
XxlJobLogger.log("offlineTaskJobHandler-->exception." , e);
return FAIL;
}
XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end.");
return SUCCESS;
}
}
示例2:分片廣播任務。
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片參數
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數:目前分片序号 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業務邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
2.4.3.2 整合dubbo
(1)引入dubbo-spring-boot-starter和業務facade jar包依賴。
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.demo.service</groupId>
<artifactId>xxx-facade</artifactId>
<version>1.9-SNAPSHOT</version>
</dependency>
(2)配置檔案加入dubbo消費端配置(可根據環境定義多個配置檔案,通過profile切換)。
## Dubbo 服務消費者配置
spring.dubbo.application.name=xxl-job
spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183
spring.dubbo.port=20880
spring.dubbo.version=demo
spring.dubbo.group=demo-service
(3)代碼中通過@Reference注入facade接口即可。
@Reference(check = false,version = "demo",group="demo-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
(4)啟動程式加入@EnableDubboConfiguration注解。
@SpringBootApplication
@EnableDubboConfiguration
public class XxlJobExecutorApplication {
public static void main(String[] args) {
SpringApplication.run(XxlJobExecutorApplication.class, args);
}
}
2.4.4 任務可視化配置
内置了平台項目,友善了開發者對任務的管理和執行日志的監控,并提供了一些便于測試的功能。
2.4.5 擴充
(1)任務監控和報表的優化。
(2)任務報警方式的擴充,比如加入告警中心,提供内部消息,短信告警。
(3)對實際業務内部執行出現異常情況下的不同監控告警和重試政策。
2.5 高可用 Elastic-Job
2.5.1 基本介紹
Elastic-Job是一個分布式排程解決方案,由兩個互相獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。
Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。
可惜的是已經兩年沒有疊代更新記錄。
2.5.2 原了解析
2.5.3 實踐說明
2.5.3.1 demo使用
(1)安裝zookeeper,配置注冊中心config,配置檔案加入注冊中心zk的配置。
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
spring.application.name=demo_elasticjob
regCenter.serverList=localhost:2181
regCenter.namespace=demo_elasticjob
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd
(2)配置資料源config,并配置檔案中加入資料源配置。
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceProperties {
private String url;
private String username;
private String password;
@Bean
@Primary
public DataSource getDataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
}
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd
(3)配置事件config。
@Configuration
public class JobEventConfig {
@Autowired
private DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
(4)為了便于靈活配置不同的任務觸發事件,加入ElasticSimpleJob注解。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
@AliasFor("cron")
String value() default "";
@AliasFor("value")
String cron() default "";
String jobName() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
}
(5)對配置進行初始化。
@Configuration
@ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")
public class ElasticJobAutoConfiguration {
@Value("${regCenter.serverList}")
private String serverList;
@Value("${regCenter.namespace}")
private String namespace;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initElasticJob() {
ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
regCenter.init();
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
SimpleJob simpleJob = entry.getValue();
ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
jobScheduler.init();
}
}
}
(6)實作 SimpleJob接口,按上文中方法整合dubbo, 完成業務邏輯。
@ElasticSimpleJob(
cron = "*/10 * * * * ?",
jobName = "OfflineTaskJob",
shardingTotalCount = 2,
jobParameter = "測試參數",
shardingItemParameters = "0=A,1=B")
@Component
public class MySimpleJob implements SimpleJob {
Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
@Reference(check = false, version = "cms-dev", group = "cms-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
@Override
public void execute(ShardingContext shardingContext) {
offlineTaskExecutorFacade.executeOfflineTask();
logger.info(String.format("Thread ID: %s, 作業分片總數: %s, " +
"目前分片項: %s.目前參數: %s," +
"作業名稱: %s.作業自定義參數: %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
2.6 其餘開源架構
(1)Saturn:Saturn是唯品會開源的一個分布式任務排程平台,在Elastic Job的基礎上進行了改造。
(2)SIA-TASK:是宜信開源的分布式任務排程平台。
三、優劣勢對比和業務場景适配思考
業務思考:
- 豐富任務監控資料和告警政策。
- 接入統一登入和權限控制。
- 進一步簡化業務接入步驟。
四、結語
對于并發場景不是特别高的系統來說,xxl-job配置部署簡單易用,不需要引入多餘的元件,同時提供了可視化的控制台,使用起來非常友好,是一個比較好的選擇。希望直接利用開源分布式架構能力的系統,建議根據自身的情況來進行合适的選型。