天天看點

Elastic-job分布式作業排程架構

閱讀文本大概需要3分鐘。

如果我們的項目是部署到多台機器上,那麼某一時刻,我們的定時任務肯定每台機器上都會執行一遍,那這肯定不是我們想要的結果,我們隻希望有一台機器能執行。

0x01: 前言

Elastic job是當當網架構師基于Zookepper、Quartz開發并開源的一個Java分布式定時任務,解決了Quartz不支援分布式的弊端。Elastic job主要的功能有支援彈性擴容,通過Zookepper集中管理和監控job,支援失效轉移等,這些都是Quartz等其他定時任務無法比拟的。

官網說明

目前Elastic job的最新版本已經由原來的elastic-job-core分離除了兩個項目,分别為Elastic-Job-Lite和Elastic-Job-Cloud。Elastic-Job是一個分布式排程解決方案,由兩個互相獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。

Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。

Elastic-Job-Cloud使用Mesos +Docker(TBD)的解決方案,額外提供資源治理、應用分發以及程序隔離等服務,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API開發作業,開發者僅需一次開發,即可根據需要以Lite或Cloud的方式部署。

https://github.com/dangdangdotcom/elastic-job
           

0x02: 如何使用Elastic-Job

Elastic-job提供3種類型作業:

    1、Simple類型作業

    2、Dataflow類型作業

    3、Script類型作業

添加依賴

 <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-spring</artifactId>
      <version>1.1.1</version>
 </dependency>
           

添加配置檔案application.properties

################################## Elastic-Job ##################################
regCenter.namespace=my-job
regCenter.zk.address=localhost1:2181,localhost2:2181
regCenter.baseSleepTimeMilliseconds=1000
regCenter.maxSleepTimeMilliseconds=3000
regCenter.maxRetries=3
regCenter.sessionTimeoutMilliseconds=10000
#如果為true,可以在控制台檢測到作業的執行狀态
job.monitorExecution=true
#失效轉移,如果為true,當作業在執行過程中異常中斷,作業會被分發到叢集中存活的結點
job.failover=true
#如果為true,在開始部署的時候作業不會自啟動,即使到了觸發時間,需要在控制台手動觸發。
job.disabled=false
#如果為true,則會覆寫zk的配置
job.overwrite=true
job.monitorPort=9888
#隻有當分片的數量設定為1的時候,整個叢集中隻會有一個程序去執行該Job,在伺服器數量沒有波動的情況下,任務總會在固定某個程序上執行。在作業執行前程序如果挂了,那作業會被配置設定到叢集某一個存活的程序中
job.shardingTotalCount=1
job.shardingItemParameters=
           

配置zk和Job application-Context-jobs.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd">
    <reg:zookeeper id="regCenter"
                   server-lists="${regCenter.zk.address}"
                   namespace="${regCenter.namespace}-${env}"
                   base-sleep-time-milliseconds="${regCenter.baseSleepTimeMilliseconds}"
                   max-sleep-time-milliseconds="${regCenter.maxSleepTimeMilliseconds}"
                   max-retries="${regCenter.maxRetries}"
                   nested-port="-1"
                   session-timeout-milliseconds="${regCenter.sessionTimeoutMilliseconds}"/>

    <job:simple id="myJob"
                class="com.xx.job.MyJob"
                registry-center-ref="regCenter"
                sharding-total-count="${job.shardingTotalCount}"
                cron="0/5 * * * * ?"
                sharding-item-parameters="${job.shardingItemParameters}"
                monitor-execution="${job.monitorExecution}"
                monitor-port="${job.monitorPort}"
                failover="${job.failover}"
                description="我的job"
                disabled="${job.disabled}"
                overwrite="${job.overwrite}"/>
</beans>
           

封裝執行器(并行和串行)

串行任務執行器

/**
 * 串行任務執行器
 */
public abstract class AbstractSerialExecutor extends AbstractSimpleElasticJob {
    public abstract void executeJob(String jobName);

    @Override
    public void process(JobExecutionMultipleShardingContext shardingContext) {
        long t1 = System.currentTimeMillis();
        executeJob(shardingContext.getJobName());
        LogUtil.successJob(shardingContext.getJobName(), (System.currentTimeMillis() - t1));
    }
}
           

并行任務執行器

/**
 * 并行任務執行器
 */
public abstract class AbstractParallelExecutor extends AbstractSimpleElasticJob implements Runnable {
    @Override
    public void process(JobExecutionMultipleShardingContext shardingContext) {
        something....
    }
}
           

編寫Job

@Component
public class MyJob extends AbstractSerialExecutor {
    private final static Logger logger = LoggerFactory.getLogger(MyJob.class);
    @Override
    public void executeJob(String jobName) {
        System.out.println("*****************Myjob.....");
    }
}
           

0x03: Elastic-Job監控背景搭建

Elastic-job分布式作業排程架構

下載下傳位址:

https://github.com/elasticjob/elastic-job-lite/tree/dev/elastic-job-lite-console
           

具體怎麼弄,可以去搜一個教程…

Elastic-job分布式作業排程架構

zookeeper裡面儲存了我們Job的資訊:

Elastic-job分布式作業排程架構

0x04: 測試分布式定時任務

我們把我們項目打成jar包,本地啟動一個,另一台伺服器也啟動。我們發現隻有一台伺服器在運作定時任務:( 0/5 * * * * ?)每5秒跑一次…

Elastic-job分布式作業排程架構
Elastic-job分布式作業排程架構

這裡我們做一個實驗,我們假設一台伺服器當機了(現在在跑定時任務的主機),會怎麼樣?

Elastic-job分布式作業排程架構
Elastic-job分布式作業排程架構

使用Elastic-job輕松幫我們實作分布式定時任務,原理應該都一樣。我推測:zookeeper的Leader選舉,哪台機器被選為Leader,那他就跑任務。

0x05: 擴充

Elastic-Job監控平台也給我們提供了便利,我們可以輕松監控作業,配置作業…

Elastic-job分布式作業排程架構
Elastic-job分布式作業排程架構
Elastic-job分布式作業排程架構
來源:https://blog.csdn.net/m0_37499059/article/details/82888686
           

往期精彩

01 漫談發版哪些事,好課程推薦

02 Linux的常用最危險的指令

03 精講Spring&nbsp;Boot—入門+進階+執行個體

04 優秀的Java程式員必須了解的GC哪些

05 網際網路支付系統整體架構詳解

關注我

每天進步一點點

Elastic-job分布式作業排程架構

很幹!在看嗎?☟