天天看點

Elastic-Job開發指南

大多數情況下,定時任務我們一般使用quartz開源架構就能滿足應用場景。但如果考慮到健壯性等其它一些因素,就需要自己下點工夫,比如:要避免單點故障,至少得部署2個節點吧,但是部署多個節點,又有其它問題,有些資料在某一個時刻隻能處理一次,比如 i = i+1 這些無法保證幂等的操作,run多次跟run一次,完全是不同的效果。   對于上面的問題,我曾經自行設計過一個基于zk分布式鎖的解決方案: 1、每類定時job,可以配置設定一個獨立的辨別(比如:xxx_job) 2、這類job的執行個體,部署在多個節點上時,每個節點啟動前,向zk申請一個分布式鎖(在xxx_job節點下) 3、拿到鎖的執行個體,才允許啟動定時任務(通過代碼控制quartz的schedule),沒拿到鎖的,處于standby狀态,一直監聽鎖的變化 4、如果某個節點挂了,分布式鎖自動釋放,其它節點這時會搶到鎖,按上面的邏輯,就會從standby狀态,轉為激活狀态,小三正式上位,繼續執行定時job。   這個方案,基本上解決了HA和業務正确性的問題,但是美中不足的地方有2點: 1、無法充分利用機器性能,處于standby的節點,實際上隻是一個備胎,平時啥也不幹 2、性能不友善擴充,比如:某個job一次要處理上千萬的資料,僅1個激活節點,要處理很久   好了,前戲鋪墊了這麼久,該請主角登場了,elastic-job相當于quartz+zk的加強版,它允許對定時任務分片,可以叢集部署(每個job的"分片"會分散到各個節點上),如果某個節點挂了,該節點上的分片,會排程到其它節點上。官網上有比較詳細的教程,一般情況下,使用SimpleJob這種就可以了。   使用步驟: 前提:要先添加下面二個jar的依賴 

compile "com.dangdang:elastic-job-lite-core:2.1.5"
compile "com.dangdang:elastic-job-lite-spring:2.1.5" 
           

1、自己的job要繼承自SimpleJob,然後實作void execute(ShardingContext shardingContext)。

public interface SimpleJob extends ElasticJob {
    
    /**
     * 執行作業.
     *
     * @param shardingContext 分片上下文
     */
    void execute(ShardingContext shardingContext);
}
           

注意這裡面有一個shardingContext參數,看下源碼:

/**
 * 分片上下文.
 * 
 * @author zhangliang
 */
@Getter
@ToString
public final class ShardingContext {
    
    /**
     * 作業名稱.
     */
    private final String jobName;
    
    /**
     * 作業任務ID.
     */
    private final String taskId;
    
    /**
     * 分片總數.
     */
    private final int shardingTotalCount;
    
    /**
     * 作業自定義參數.
     * 可以配置多個相同的作業, 但是用不同的參數作為不同的排程執行個體.
     */
    private final String jobParameter;
    
    /**
     * 配置設定于本作業執行個體的分片項.
     */
    private final int shardingItem;
    
    /**
     * 配置設定于本作業執行個體的分片參數.
     */
    private final String shardingParameter;
    
    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}
           
這裡面有2個很重要的屬性:shardingTotalCount 分片總數(比如:2)、shardingItem 目前分片索引(比如:1),前面提到的性能擴容,就可以根據2個參數進行簡單的處理,假設在電商系統中,每天晚上有個定時任務,要統計每家店的銷量。商家id一般在表設計上是一個自增數字,如果總共2個分片(注:通常也就是部署2個節點),可以把 id為奇數的放到分片0,id為偶數的放到分片1,這樣2個機器各跑一半,相對隻有1台機器而言,就快多了。      

僞代碼如下:

public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardIndx = shardingContext.getShardingItem();
        if (shardIndx == 0) {
            //處理id為奇數的商家
        } else {
            //處理id為偶數的商家
        }
    }
}
           

這個還可以進一步簡化,如果使用mysql查詢商家清單,mysql中有一個mod函數,直接可以對商家id進行取模運算

select * from shop where mod(shop_id,2)=0
           

如果把上面的2、0換成參數,mybatis中類似這樣:

select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}
           

這樣邏輯就轉換到sql中處理了,java代碼中把參數傳進來就行,連if都可以省掉。

2、接下來看看如何配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.dangdang.com/schema/ddframe/reg
       http://www.dangdang.com/schema/ddframe/reg/reg.xsd
       http://www.dangdang.com/schema/ddframe/job
       http://www.dangdang.com/schema/ddframe/job/job.xsd">

    <reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job"
                   base-sleep-time-milliseconds="1000"
                   max-sleep-time-milliseconds="3000" max-retries="3"/>

    <job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter"
                cron="${xxxJob_cornExpress}"
                sharding-total-count="2" sharding-item-parameters="0=A,1=B"/>

                ...
</beans>
           

與正常的spring配置幾乎沒啥差別,幾個要點如下:

a) 因為分片排程是基于zk的,是以要先配置zk注冊中心,其中${zk_address}大家可以改成實際的zk位址清單,比如:10.x.x.1:2181,10.x.x.2:2181,10.x.x.3:2181 

b) 每個job中的corn屬性,就是quartz中的cornExpress表達式,然後sharding-total-count即總分片數,而sharding-item-parameters則是指定每個分片中的具體參數

(注:剛才的電商每天晚上算銷量,這個case其實隻用到了分片索引、分片數,并不需要參數,是以這裡随便配置一個類似0=A, 1=B就可以了,如果有些業務場景,希望在知道分片索引的同時,還希望額外傳一些參數進來,就可以在這裡配置你希望的參數,然後在execute中,也能讀到相應的參數)

3、控制台

elastic-job還提供了一個不錯的UI控制台,項目源代碼git clone到本地,mvn install就能得到一個elastic-job-lite-console-${version}.tar.gz的包,解壓,然後運作裡面的bin/start.sh 就能跑起來,界面類似如下:

Elastic-Job開發指南

通過這個控制台,可以動态調整每個定時任務的觸發時間(即:cornExpress)。詳情可參考官網文檔-運維平台部分。

4、與spring-cloud/spring-boot的整合

如果是傳統的spring項目,按上面的步驟就可以無縫整合了,如果是spring-cloud/spring-boot,則稍微要複雜點。

由于spring-boot倡導零xml配置,是以大部配置設定置就用代碼替代了,先定義一個elasticJob的配置類:

@Data
@Configuration
public class ElasticJobConfig {

    @Value("${rxQuartz.app.zkAddress}")
    private String zkNodes;

    @Value("${rxQuartz.app.namespace}")
    private String namespace;

    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(zkNodes, namespace);
    }

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }
}
           

上面這段代碼,主要是解決zk注冊中心的注入問題,然後各種xxxJob,由于要讓spring自動注入,需要打上component注解

@Component("xxxJob")
public class XXXJob extends AbstractJob {
   
    ...
}
           

然後在真正要用的地方,把他們組裝起來

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: yangjunming
 */
@Configuration
public class ElasticJobs {

    @Autowired
    @Qualifier("xxxJob")
    public SimpleJob xxxJob;
   
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Bean(initMethod = "init")
    public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob,
                                               @Value("${xxxJob.billCronExpress}") final String cron,
                                               @Value("${xxxJob.shardingCount}") int shardingCount,
                                               @Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
    }
}
           

job:simple命名空間屬性詳細說明

屬性名 類型 是否必填 預設值 描述
id String

作業名稱
class String 作業實作類,需實作

ElasticJob

接口,腳本型作業不需要配置
registry-center-ref String

注冊中心

Bean

的引用,需引用

reg:zookeeper

的聲明
cron String

cron

表達式,用于配置作業觸發時間
sharding-total-count int

作業分片總數
sharding-item-parameters String

分片序列号和參數用等号分隔,多個鍵值對用逗号分隔

分片序列号從

開始,不可大于或等于作業分片總數

如:

0=a,1=b,2=c

job-parameter String

作業自定義參數

可以配置多個相同的作業,但是用不同的參數作為不同的排程執行個體

monitor-execution boolean true

監控作業運作時狀态

每次作業執行時間和間隔時間均非常短的情況,建議不監控作業運作時狀态以提升效率。因為是瞬時狀态,是以無必要監控。請使用者自行增加資料堆積監控。并且不能保證資料重複選取,應在作業中實作幂等性。

每次作業執行時間和間隔時間均較長的情況,建議監控作業運作時狀态,可保證資料不會重複選取。

monitor-port int -1

作業監控端口

建議配置作業監控端口, 友善開發者dump作業資訊。

使用方法: echo “dump” | nc 127.0.0.1 9888

max-time-diff-seconds int -1

最大允許的本機與注冊中心的時間誤差秒數

如果時間誤差超過配置秒數則作業啟動時将抛異常

配置為

-1

表示不校驗時間誤差
failover boolean false

是否開啟失效轉移

monitorExecution

開啟,失效轉移才有效
misfire boolean true 是否開啟錯過任務重新執行
job-sharding-strategy-class String true

作業分片政策實作類全路徑

預設使用平均配置設定政策

詳情參見:作業分片政策

description String 作業描述資訊
disabled boolean false

作業是否禁止啟動

可用于部署作業時,先禁止啟動,部署結束後統一啟動

overwrite boolean false

本地配置是否可覆寫注冊中心配置

如果可覆寫,每次啟動作業都以本地配置為準

job:dataflow命名空間屬性詳細說明

job:dataflow命名空間擁有job:simple命名空間的全部屬性,以下僅列出特有屬性

屬性名 類型 是否必填 預設值 描述
process-count-interval-seconds int 300

統計作業處理資料數量的間隔時間

機關:秒

concurrent-data-process-thread-count int CPU核數*2

同時處理資料的并發線程數

不能小于1

ThroughputDataFlow

作業有效
fetch-data-count int 1 每次抓取的資料量
streaming-process boolean false

是否流式處理資料

如果流式處理資料, 則

fetchData

不傳回空結果将持續執行作業

如果非流式處理資料, 則處理資料完成後作業結束

job:script命名空間屬性詳細說明,基本屬性參照job:simple命名空間屬性詳細說明

job:script命名空間擁有job:simple命名空間的全部屬性,以下僅列出特有屬性

屬性名 類型 是否必填 預設值 描述
script-command-line String 腳本型作業執行指令行

job:listener命名空間屬性詳細說明

job:listener

必須配置為

job:bean

的子元素

屬性名 類型 是否必填 預設值 描述
class String

前置後置任務監聽實作類,需實作

ElasticJobListener

接口
started-timeout-milliseconds long

Long.MAX_VALUE

AbstractDistributeOnceElasticJobListener型監聽器,最後一個作業執行前的執行方法的逾時時間

機關:毫秒

completed-timeout-milliseconds long

Long.MAX_VALUE

AbstractDistributeOnceElasticJobListener型監聽器,最後一個作業執行後的執行方法的逾時時間

機關:毫秒

reg:bean命名空間屬性詳細說明

屬性名 類型 是否必填 預設值 描述
id String

注冊中心在

Spring

容器中的主鍵
server-lists String

連接配接

Zookeeper

伺服器的清單

包括IP位址和端口号

多個位址用逗号分隔

如: host1:2181,host2:2181

namespace String

Zookeeper

的命名空間
base-sleep-time-milliseconds int 1000

等待重試的間隔時間的初始值

機關:毫秒

max-sleep-time-milliseconds int 3000

等待重試的間隔時間的最大值

機關:毫秒

max-retries int 3 最大重試次數
session-timeout-milliseconds int 60000

會話逾時時間

機關:毫秒

connection-timeout-milliseconds int 15000

連接配接逾時時間

機關:毫秒

digest String 無驗證 連接配接

Zookeeper

的權限令牌

預設為不需要權限驗證

繼續閱讀