大多數情況下,定時任務我們一般使用quartz開源架構就能滿足應用場景。但如果考慮到健壯性等其它一些因素,就需要自己下點工夫,比如:要避免單點故障,至少得部署2個節點吧,但是部署多個節點,又有其它問題,有些資料在某一個時刻隻能處理一次,比如 i = i+1 這些無法保證幂等的操作,run多次跟run一次,完全是不同的效果。 對于上面的問題,我曾經自行設計過一個基于zk分布式鎖的解決方案: 1、每類定時job,可以配置設定一個獨立的辨別(比如:xxx_job) 2、這類job的執行個體,部署在多個節點上時,每個節點啟動前,向zk申請一個分布式鎖(在xxx_job節點下) 3、拿到鎖的執行個體,才允許啟動定時任務(通過代碼控制quartz的schedule),沒拿到鎖的,處于standby狀态,一直監聽鎖的變化 4、如果某個節點挂了,分布式鎖自動釋放,其它節點這時會搶到鎖,按上面的邏輯,就會從standby狀态,轉為激活狀态,小三正式上位,繼續執行定時job。 這個方案,基本上解決了HA和業務正确性的問題,但是美中不足的地方有2點: 1、無法充分利用機器性能,處于standby的節點,實際上隻是一個備胎,平時啥也不幹 2、性能不友善擴充,比如:某個job一次要處理上千萬的資料,僅1個激活節點,要處理很久 好了,前戲鋪墊了這麼久,該請主角登場了,elastic-job相當于quartz+zk的加強版,它允許對定時任務分片,可以叢集部署(每個job的"分片"會分散到各個節點上),如果某個節點挂了,該節點上的分片,會排程到其它節點上。官網上有比較詳細的教程,一般情況下,使用SimpleJob這種就可以了。 使用步驟: 前提:要先添加下面二個jar的依賴
compile "com.dangdang:elastic-job-lite-core:2.1.5"
compile "com.dangdang:elastic-job-lite-spring:2.1.5"
1、自己的job要繼承自SimpleJob,然後實作void execute(ShardingContext shardingContext)。
public interface SimpleJob extends ElasticJob {
/**
* 執行作業.
*
* @param shardingContext 分片上下文
*/
void execute(ShardingContext shardingContext);
}
注意這裡面有一個shardingContext參數,看下源碼:
/**
* 分片上下文.
*
* @author zhangliang
*/
@Getter
@ToString
public final class ShardingContext {
/**
* 作業名稱.
*/
private final String jobName;
/**
* 作業任務ID.
*/
private final String taskId;
/**
* 分片總數.
*/
private final int shardingTotalCount;
/**
* 作業自定義參數.
* 可以配置多個相同的作業, 但是用不同的參數作為不同的排程執行個體.
*/
private final String jobParameter;
/**
* 配置設定于本作業執行個體的分片項.
*/
private final int shardingItem;
/**
* 配置設定于本作業執行個體的分片參數.
*/
private final String shardingParameter;
public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this.shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
}
這裡面有2個很重要的屬性:shardingTotalCount 分片總數(比如:2)、shardingItem 目前分片索引(比如:1),前面提到的性能擴容,就可以根據2個參數進行簡單的處理,假設在電商系統中,每天晚上有個定時任務,要統計每家店的銷量。商家id一般在表設計上是一個自增數字,如果總共2個分片(注:通常也就是部署2個節點),可以把 id為奇數的放到分片0,id為偶數的放到分片1,這樣2個機器各跑一半,相對隻有1台機器而言,就快多了。
僞代碼如下:
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardIndx = shardingContext.getShardingItem();
if (shardIndx == 0) {
//處理id為奇數的商家
} else {
//處理id為偶數的商家
}
}
}
這個還可以進一步簡化,如果使用mysql查詢商家清單,mysql中有一個mod函數,直接可以對商家id進行取模運算
select * from shop where mod(shop_id,2)=0
如果把上面的2、0換成參數,mybatis中類似這樣:
select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}
這樣邏輯就轉換到sql中處理了,java代碼中把參數傳進來就行,連if都可以省掉。
2、接下來看看如何配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3"/>
<job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter"
cron="${xxxJob_cornExpress}"
sharding-total-count="2" sharding-item-parameters="0=A,1=B"/>
...
</beans>
與正常的spring配置幾乎沒啥差別,幾個要點如下:
a) 因為分片排程是基于zk的,是以要先配置zk注冊中心,其中${zk_address}大家可以改成實際的zk位址清單,比如:10.x.x.1:2181,10.x.x.2:2181,10.x.x.3:2181
b) 每個job中的corn屬性,就是quartz中的cornExpress表達式,然後sharding-total-count即總分片數,而sharding-item-parameters則是指定每個分片中的具體參數
(注:剛才的電商每天晚上算銷量,這個case其實隻用到了分片索引、分片數,并不需要參數,是以這裡随便配置一個類似0=A, 1=B就可以了,如果有些業務場景,希望在知道分片索引的同時,還希望額外傳一些參數進來,就可以在這裡配置你希望的參數,然後在execute中,也能讀到相應的參數)
3、控制台
elastic-job還提供了一個不錯的UI控制台,項目源代碼git clone到本地,mvn install就能得到一個elastic-job-lite-console-${version}.tar.gz的包,解壓,然後運作裡面的bin/start.sh 就能跑起來,界面類似如下:
通過這個控制台,可以動态調整每個定時任務的觸發時間(即:cornExpress)。詳情可參考官網文檔-運維平台部分。
4、與spring-cloud/spring-boot的整合
如果是傳統的spring項目,按上面的步驟就可以無縫整合了,如果是spring-cloud/spring-boot,則稍微要複雜點。
由于spring-boot倡導零xml配置,是以大部配置設定置就用代碼替代了,先定義一個elasticJob的配置類:
@Data
@Configuration
public class ElasticJobConfig {
@Value("${rxQuartz.app.zkAddress}")
private String zkNodes;
@Value("${rxQuartz.app.namespace}")
private String namespace;
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(zkNodes, namespace);
}
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
}
上面這段代碼,主要是解決zk注冊中心的注入問題,然後各種xxxJob,由于要讓spring自動注入,需要打上component注解
@Component("xxxJob")
public class XXXJob extends AbstractJob {
...
}
然後在真正要用的地方,把他們組裝起來
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: yangjunming
*/
@Configuration
public class ElasticJobs {
@Autowired
@Qualifier("xxxJob")
public SimpleJob xxxJob;
@Autowired
private ZookeeperRegistryCenter regCenter;
@Bean(initMethod = "init")
public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob,
@Value("${xxxJob.billCronExpress}") final String cron,
@Value("${xxxJob.shardingCount}") int shardingCount,
@Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));
}
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
job:simple命名空間屬性詳細說明
屬性名 | 類型 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
id | String | | 作業名稱 | |
class | String | 否 | 作業實作類,需實作 接口,腳本型作業不需要配置 | |
registry-center-ref | String | | 注冊中心 的引用,需引用 的聲明 | |
cron | String | | 表達式,用于配置作業觸發時間 | |
sharding-total-count | int | | 作業分片總數 | |
sharding-item-parameters | String | 否 | 分片序列号和參數用等号分隔,多個鍵值對用逗号分隔 分片序列号從 開始,不可大于或等于作業分片總數 如: | |
job-parameter | String | 否 | 作業自定義參數 可以配置多個相同的作業,但是用不同的參數作為不同的排程執行個體 | |
monitor-execution | boolean | 否 | true | 監控作業運作時狀态 每次作業執行時間和間隔時間均非常短的情況,建議不監控作業運作時狀态以提升效率。因為是瞬時狀态,是以無必要監控。請使用者自行增加資料堆積監控。并且不能保證資料重複選取,應在作業中實作幂等性。 每次作業執行時間和間隔時間均較長的情況,建議監控作業運作時狀态,可保證資料不會重複選取。 |
monitor-port | int | 否 | -1 | 作業監控端口 建議配置作業監控端口, 友善開發者dump作業資訊。 使用方法: echo “dump” | nc 127.0.0.1 9888 |
max-time-diff-seconds | int | 否 | -1 | 最大允許的本機與注冊中心的時間誤差秒數 如果時間誤差超過配置秒數則作業啟動時将抛異常 配置為 表示不校驗時間誤差 |
failover | boolean | 否 | false | 是否開啟失效轉移 僅 開啟,失效轉移才有效 |
misfire | boolean | 否 | true | 是否開啟錯過任務重新執行 |
job-sharding-strategy-class | String | 否 | true | 作業分片政策實作類全路徑 預設使用平均配置設定政策 詳情參見:作業分片政策 |
description | String | 否 | 作業描述資訊 | |
disabled | boolean | 否 | false | 作業是否禁止啟動 可用于部署作業時,先禁止啟動,部署結束後統一啟動 |
overwrite | boolean | 否 | false | 本地配置是否可覆寫注冊中心配置 如果可覆寫,每次啟動作業都以本地配置為準 |
job:dataflow命名空間屬性詳細說明
job:dataflow命名空間擁有job:simple命名空間的全部屬性,以下僅列出特有屬性
屬性名 | 類型 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
process-count-interval-seconds | int | 否 | 300 | 統計作業處理資料數量的間隔時間 機關:秒 |
concurrent-data-process-thread-count | int | 否 | CPU核數*2 | 同時處理資料的并發線程數 不能小于1 僅 作業有效 |
fetch-data-count | int | 否 | 1 | 每次抓取的資料量 |
streaming-process | boolean | 否 | false | 是否流式處理資料 如果流式處理資料, 則 不傳回空結果将持續執行作業 如果非流式處理資料, 則處理資料完成後作業結束 |
job:script命名空間屬性詳細說明,基本屬性參照job:simple命名空間屬性詳細說明
job:script命名空間擁有job:simple命名空間的全部屬性,以下僅列出特有屬性
屬性名 | 類型 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
script-command-line | String | 否 | 腳本型作業執行指令行 |
job:listener命名空間屬性詳細說明
job:listener
必須配置為
job:bean
的子元素
屬性名 | 類型 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
class | String | | 前置後置任務監聽實作類,需實作 接口 | |
started-timeout-milliseconds | long | | Long.MAX_VALUE | AbstractDistributeOnceElasticJobListener型監聽器,最後一個作業執行前的執行方法的逾時時間 機關:毫秒 |
completed-timeout-milliseconds | long | | Long.MAX_VALUE | AbstractDistributeOnceElasticJobListener型監聽器,最後一個作業執行後的執行方法的逾時時間 機關:毫秒 |
reg:bean命名空間屬性詳細說明
屬性名 | 類型 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
id | String | | 注冊中心在 容器中的主鍵 | |
server-lists | String | | 連接配接 伺服器的清單 包括IP位址和端口号 多個位址用逗号分隔 如: host1:2181,host2:2181 | |
namespace | String | | 的命名空間 | |
base-sleep-time-milliseconds | int | 否 | 1000 | 等待重試的間隔時間的初始值 機關:毫秒 |
max-sleep-time-milliseconds | int | 否 | 3000 | 等待重試的間隔時間的最大值 機關:毫秒 |
max-retries | int | 否 | 3 | 最大重試次數 |
session-timeout-milliseconds | int | 否 | 60000 | 會話逾時時間 機關:毫秒 |
connection-timeout-milliseconds | int | 否 | 15000 | 連接配接逾時時間 機關:毫秒 |
digest | String | 否 | 無驗證 | 連接配接 的權限令牌 預設為不需要權限驗證 |