文章目錄
- elastic-job
-
- 分布式任務排程
-
- 概念
- 分類
- elastic-job簡介
-
- 下載下傳安裝使用
- 實戰案例
-
- 靜态任務案例
- 動态任務案例
elastic-job
分布式任務排程
概念
很多時候,我們需要定時執行一些程式完成一些預定要完成的操作,如果手動處理,一旦任務量過大,就非常麻煩,是以用定時任務去操作是個非常不錯的選項,這種操作就是分布式任務排程。
分類
現在的應用多數是分布式或者微服務,是以我們需要的是分布式任務排程,那麼現在分布式任務排程流行的主要有elastic-job、xxl-job、quartz等,我們這裡做一個對比:
feature | quartz | elastic-job | xxl-job | antares | opencron |
---|---|---|---|---|---|
依賴 | mysql | jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+ | mysql ,jdk1.7+ , maven3.0+ | jdk 1.7+ , redis , zookeeper | jdk1.7+ , Tomcat8.0+ |
HA | 多節點部署,通過競争資料庫鎖來保證隻有一個節點執行任務 | 通過zookeeper的注冊與發現,可以動态的添加伺服器。 支援水準擴容 | 叢集部署 | 叢集部署 | — |
任務分片 | — | 支援 | 支援 | 支援 | — |
文檔完善 | 完善 | 完善 | 完善 | 文檔略少 | 文檔略少 |
管理界面 | 無 | 支援 | 支援 | 支援 | 支援 |
難易程度 | 簡單 | 簡單 | 簡單 | 一般 | 一般 |
公司 | OpenSymphony | 當當網 | 個人 | 個人 | 個人 |
進階功能 | — | 彈性擴容,多種作業模式,失效轉移,運作狀态收集,多線程處理資料,幂等性,容錯處理,spring命名空間支援 | 彈性擴容,分片廣播,故障轉移,Rolling實時日志,GLUE(支援線上編輯代碼,免釋出),任務進度監控,任務依賴,資料加密,郵件報警,運作報表,國際化 | 任務分片, 失效轉移,彈性擴容 , | 時間規則支援quartz和crontab ,kill任務, 現場執行,查詢任務運作狀态 |
使用企業 | 大衆化産品,對分布式排程要求不高的公司大面積使用 | 36氪,當當網,國美,金柚網,聯想,唯品會,亞信,平安,豬八戒 | 大衆點評,運滿滿,優信二手車,拍拍貸 | — | — |
elastic-job簡介
中文官網:https://shardingsphere.apache.org/elasticjob/index_zh.html
ElasticJob 是一個分布式排程解決方案,由 2 個互相獨立的子項目 ElasticJob-Lite 和 ElasticJob-Cloud 組成。
ElasticJob-Lite 定位為輕量級無中心化解決方案,使用jar的形式提供分布式任務的協調服務;ElasticJob-Cloud 使用 Mesos 的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。
ElasticJob 的各個産品使用統一的作業 API,開發者僅需要一次開發,即可随意部署。
下載下傳安裝使用
可以參考官方文檔:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
實戰案例
靜态任務案例
使用elastic-job很容易,我們接下來學習下elastic-job的使用,這裡的案例我們先實作靜态任務案例,靜态任務案例也就是執行時間事先寫好。
實作步驟:
- 引入依賴包
- 配置zookeeper節點以及任務名稱命名空間
- 實作自定義任務,需要實作SimpleJob接口
1)在
seckill-goods
中引入依賴
<!-- ElasticJobAutoConfiguration自動配置類作用-->
<dependency>
<groupId>com.github.kuhn-he</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
2)配置elastic-job
在
bootstrap.yml
中配置
elastic-job
,如下:
elaticjob:
zookeeper:
server-lists: zk-server:3181
namespace: updatetask
配置說明:
-
zookeeper的位址server-lists:
-
:定時任務命名空間namespace
3)任務建立
建立
com.seckill.goods.task.statictask.ElasticjobTask
,代碼如下:
@ElasticSimpleJob(
cron = "5/10 * * * * ?",
jobName = "updateTask",
shardingTotalCount = 1
)
@Component
public class ElasticjobTask implements SimpleJob {
/***
* 執行任務
* @param shardingContext
*/
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("-----------執行!");
}
}
參數說明:
- cron:定時表達式
- jobName:這裡和bootstrap.yml中的namespace保持一緻
- shardingTotalCount:分片數量
動态任務案例
項目參考位址:https://github.com/LuoLiangDSGA/spring-learning/tree/master/boot-elasticjob
動态任務案例主要是講解程式在運作時,動态添加定時任務,這種場景應用非常廣泛。使用elastic-job實作動态添加定時任務的實作有點複雜,我們接下來實際操作一次。
實作步驟:
- 配置初始化的zookeeper位址
- 配置的定時任務命名空間(不一定會使用)
- 注冊初始化資料
- 監聽器->任務執行前後監聽(可有可無)
- 動态添加定時任務實作
- 自定義任務處理過程-實作SimpleJob
1)監聽器建立
監聽器采用AOP模式,類似前置通知和後置通知,
doBeforeJobExecutedAtLastStarted
和
doAfterJobExecutedAtLastCompleted
分别會在任務執行前和執行後調用,我們建立一個監聽器實作任務排程前後攔截,
com.seckill.goods.task.dynamic.ElasticJobListener
:
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
/****
* 構造函數
* @param startedTimeoutMilliseconds
* @param completedTimeoutMilliseconds
*/
public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
/***
* 任務初始化前要做的事情,類似前置通知
* @param shardingContexts
*/
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
System.out.println("========doBeforeJobExecutedAtLastStarted========"+ TimeUtil.date2FormatHHmmss(new Date()));
}
/***
* 任務執行完成後要做的事情,類似後置通知
* @param shardingContexts
*/
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
System.out.println("=======doAfterJobExecutedAtLastCompleted============="+ TimeUtil.date2FormatHHmmss(new Date()));
}
}
2)注冊中心配置
在bootstrap.yml中配置zk和namespace
#配置動态任務案例的zk和namespace
zk: zk-server:3181
namesp: autotask
建立配置類配置注冊中心資訊,
com.seckill.goods.task.dynamic.ElasticJobConfig
:
@Configuration
public class ElasticJobConfig {
//配置檔案中的zookeeper的ip和端口
@Value(value = "${zk}")
private String serverlists;
//指定一個命名空間
@Value("${namesp}")
private String namespace;
/***
* 配置Zookeeper和namespace
* @return
*/
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(serverlists, namespace);
}
/***
* 向zookeeper注冊初始化資訊
* @param config
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
/****
* 建立ElasticJob的監聽器執行個體
* @return
*/
@Bean
public ElasticJobListener elasticJobListener() {
//初始化要給定逾時多少秒重連
return new ElasticJobListener(100L,100L);
}
}
3)任務建構
我們建立一個動态配置任務的類,任何邏輯代碼需要建立定時任務,可以直接調用該類的指定方法即可。建立類:
com.seckill.goods.task.dynamic.ElasticJobHandler
,代碼如下:
@Component
public class ElasticJobHandler {
@Resource
private ZookeeperRegistryCenter registryCenter;
@Resource
private ElasticJobListener elasticJobListener;
/**
* @param jobName:任務的命名空間
* @param jobClass:執行的定時任務對象
* @param shardingTotalCount:分片個數
* @param cron:定時周期表達式
* @param id:自定義參數
* @return
*/
private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
Class<? extends SimpleJob> jobClass,
int shardingTotalCount,
String cron,
String id) {
//建立任務建構對象
LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.
//任務命名空間名字、任務執行周期表達式、分片個數
newBuilder(jobName, cron, shardingTotalCount).
//自定義參數
jobParameter(id).
build(),
jobClass.getCanonicalName()));
//本地配置是否可覆寫注冊中心配置
builder.overwrite(true);
return builder;
}
/**
* 添加一個定時任務
* @param cron:周期執行表達式
* @param id:自定義參數
* @param jobName:命名空間
* @param instance:任務對象
*/
public void addPublishJob(String cron,String id,String jobName,SimpleJob instance) {
LiteJobConfiguration jobConfig = simpleJobConfigBuilder(
jobName,
instance.getClass(),
1,
cron,
id).overwrite(true).build();
//DynamicTask為具體的任務執行邏輯類
new SpringJobScheduler(instance, registryCenter, jobConfig, elasticJobListener).init();
}
/***
* Date轉cron表達式
*/
public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
/**
* 獲得定時
* @param date
* @return
*/
public static String getCron(final Date date) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(CRON_DATE_FORMAT);
return simpleDateFormat.format(date);
}
}
4)執行邏輯
我們接着建立一個類,用于執行自己所需要操作的邏輯,
com.seckill.goods.task.dynamic.DynamicTask
,代碼如下:
public class DynamicTask implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
//傳遞的參數
String id = shardingContext.getJobParameter();
try {
//具體任務邏輯
System.out.println("執行你的邏輯代碼!param:"+id);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5)調用測試
建立
com.seckill.goods.controller.TaskController
動态調用建立任務的方法,代碼如下:
@RestController
@RequestMapping(value = "/task")
public class TaskController {
@Autowired
ElasticJobHandler elasticJobHandler;
/***
* 動态建立任務
* @param times:延遲時間,為了測試到效果,是以在目前時間往後延遲
* @param jobname:任務名字
* @param param:自定義參數
* @return
*/
@GetMapping
public Result add(Long times,String jobname,String param){
//在目前指定時間内延遲times毫秒執行任務
Date date = new Date(System.currentTimeMillis()+times);
//需要傳遞給定時任務的參數
String cron = ElasticJobHandler.getCron(date);
//執行任務
elasticJobHandler.addPublishJob(cron,param,jobname,new DynamicTask());
return new Result(true, StatusCode.OK,"添加任務成功!");
}
}
6)測試
通路:
http://localhost:18081/task?times=15000&jobname=asyncname¶m=No001
背景執行效果如下: