天天看點

ElasticJob‐Lite:Simple作業

ElasticJob

以下關于​

​ElasticJob​

​​的介紹來自​​官方文檔​​:

​ElasticJob​

​​是面向網際網路生态和海量任務的分布式排程解決方案,由兩個互相獨立的子項目​

​ElasticJob‐Lite​

​​和​

​ElasticJob‐Cloud​

​​組成。它通過彈性排程、資源管控、以及作業治理的功能,打造一個适用于網際網路場景的分布式排程解決方案,并通過開放的架構設計,提供多元化的作業生态。它的各個産品使用統一的作業​

​API​

​​,開發者僅需一次開發,即可随意部署。​

​ElasticJob​

​​已于​

​2020​

​​年​

​5​

​​月​

​28​

​​日成為​

​Apache ShardingSphere​

​的子項目。

​ElasticJob​

​能夠讓開發工程師不再擔心任務的線性吞吐量提升等非功能需求,使他們能夠更加專注于面向業務編碼設計;同時,它也能夠解放運維工程師,使他們不必再擔心任務的可用性和相關管理需求,隻通過輕松的增加服務節點即可達到自動化運維的目的。

環境要求:

  • ​Java 8​

    ​及其以上版本。
  • ​Maven 3.5.0​

    ​及其以上版本。
  • ​ZooKeeper 3.6.0​

    ​及其以上版本。

這裡先介紹子項目​

​ElasticJob-Lite​

​​的使用,​

​ElasticJob‐Lite​

​的架構如下圖所示:

ElasticJob‐Lite:Simple作業

​ElasticJob‐Lite​

​​會将作業注冊到​

​ZooKeeper​

​上,在定義的命名空間下,建立作業名稱節點,用于區分不同作業,是以作業一旦建立則不能修改作業名稱,如果修改作業名稱将視為新的作業。

添加依賴(​

​3.0.1​

​​是目前最新的​

​Releases​

​版本):

<dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>3.0.1</version>
        </dependency>      
ElasticJob‐Lite:Simple作業

​ElasticJob​

​​的作業分類基于​

​class​

​​和​

​type​

​​兩種類型。基于​

​class​

​​的作業需要開發者自行通過實作接口的方式織入業務邏輯;基于​

​type​

​​的作業則無需編碼,隻需要提供相應配置即可。基于​

​class​

​​的作業接口的方法參數​

​shardingContext​

​​包含作業配置、片和運作時資訊。可通過​

​getShardingTotalCount()​

​​、​

​getShardingItem()​

​等方法分别擷取分片總數和運作在本作業伺服器的分片序列号等。

​ElasticJob​

​​目前提供​

​Simple​

​​、​

​Dataflow​

​​這兩種基于​

​class​

​​的作業類型,并提供​

​Script​

​​、​

​HTTP​

​​這兩種基于​

​type​

​​的作業類型,使用者可通過實作​

​SPI​

​接口自行擴充作業類型。

這篇部落格隻會介紹​

​Simple​

​作業,其他的作業類型以及擴充作業類型以後部落客也會給大家介紹。

Simple作業

定義​

​Simple​

​​作業需要實作​

​SimpleJob​

​​接口,而​

​SimpleJob​

​​接口隻定義了一個方法,并且繼承了​

​ElasticJob​

​接口。

public interface SimpleJob extends ElasticJob {
    
    void execute(ShardingContext shardingContext);
}      

定義一個​

​Simple​

​作業,任務被分成三個可并行的子任務,而每個子任務隻是輸出目前時間和各自執行的任務分片。

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:02
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class MySimpleJob implements SimpleJob {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println(formatter.format(new Date()) + " : ShardingItem[0]");
                break;
            case 1:
                System.out.println(formatter.format(new Date()) + " : ShardingItem[1]");
                break;
            case 2:
                System.out.println(formatter.format(new Date()) + " : ShardingItem[2]");
                break;
            default:
                System.out.println(formatter.format(new Date()) + " : Unknown ShardingItem");
        }
    }
}      

定義了作業,還需要對作業進行配置,比如作業的名稱、分片數、​

​cron​

​​時間表達式以及是否需要失效轉移等,主要通過​

​JobConfiguration​

​​類來完成這些配置,它提供了建構者風格的方法,比如下面的作業配置,作業名稱為​

​MySimpleJob​

​​、作業分片數為​

​3​

​​,并且在每一分鐘的第​

​30​

​​秒執行任務,調用​

​overwrite​

​​方法用來設定在作業啟動時是否将本地配置覆寫到注冊中心(預設不覆寫,是以本地修改了​

​cron​

​​時間表達式會不起作用),如果需要覆寫(方法傳入​

​true​

​​),則每次啟動時都将使用本地配置(即以本地的作業配置為主,不然本地修改作業配置不會起作用)。調用​

​failover​

​​方法用于設定是否開啟失效轉移(僅适用于開啟了 ​

​monitorExecution​

​​,預設開啟 ​

​monitorExecution​

​​,但預設不開啟失效轉移),​

​ElasticJob​

​不會在本次執行過程中進行重新分片(給作業節點配置設定作業分片),而是等待下次排程之前才開啟重新分片流程。當作業執行過程中伺服器當機,失效轉移允許将該次未完成的任務在另一作業節點上補償執行。

開啟失效轉移功能,​

​ElasticJob​

​會監控作業每一分片的執行狀态,并将其寫入注冊中心,供其他節點感覺。在一次運作耗時較長且間隔較長的作業場景,失效轉移是提升作業運作實時性的有效手段;對于間隔較短的作業,會産生大量與注冊中心的網絡通信,對叢集的性能産生影響。而且間隔較短的作業并未見得關注單次作業的實時性,可以通過下次作業執行的重分片使所有的分片正确執行,是以不建議短間隔作業開啟失效轉移。 另外需要注意的是,作業本身的幂等性,是保證失效轉移正确性的前提。
private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MySimpleJob", 3)
                .cron("30 * * * * ?")
                .overwrite(true)
                .failover(true)
                .build();
    }      

作業節點一:

ElasticJob‐Lite:Simple作業

作業節點二(當作業節點三當機後,會補償執行作業節點三未執行的作業分片):

ElasticJob‐Lite:Simple作業

作業節點三(模拟當機):

ElasticJob‐Lite:Simple作業

接下來還需要連接配接注冊中心(使用​

​ZooKeeper​

​​),目前預設隻支援​

​ZooKeeper​

​​作為注冊中心(​

​CoordinatorRegistryCenter​

​​接口隻有​

​ZookeeperRegistryCenter​

​一個實作類)。

ElasticJob‐Lite:Simple作業
ElasticJob‐Lite:Simple作業

​192.168.1.184:9000​

​​是​

​ZooKeeper​

​​服務端提供的用戶端連接配接套接字,而​

​my-job​

​就是作業的命名空間。

private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }      

作業的各種配置和注冊中心已經準備好了,就可以啟動作業了,通過​

​ScheduleJobBootstrap​

​類來實作,主要就是将注冊中心、作業定義以及作業配置傳入啟動作業的執行個體中。

public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(),
                createJobConfiguration()).schedule();
    }      

彙總到​

​Application​

​類:

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:05
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class Application {
    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(),
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MySimpleJob", 3)
                .cron("30 * * * * ?")
                .overwrite(true)
                .failover(true)
                .build();
    }
}      

結果如下圖所示:

ElasticJob‐Lite:Simple作業
ElasticJob‐Lite:Simple作業
ElasticJob‐Lite:Simple作業
  • ​​cron時間表達式簡介​​
  • ​​Cron表達式詳解​​