ElasticJob
以下關于
ElasticJob
的介紹來自官方文檔:
是面向網際網路生态和海量任務的分布式排程解決方案,由兩個互相獨立的子項目
ElasticJob
和
ElasticJob‐Lite
組成。它通過彈性排程、資源管控、以及作業治理的功能,打造一個适用于網際網路場景的分布式排程解決方案,并通過開放的架構設計,提供多元化的作業生态。它的各個産品使用統一的作業
ElasticJob‐Cloud
,開發者僅需一次開發,即可随意部署。
API
已于
ElasticJob
年
2020
月
5
日成為
28
Apache ShardingSphere
的子項目。
能夠讓開發工程師不再擔心任務的線性吞吐量提升等非功能需求,使他們能夠更加專注于面向業務編碼設計;同時,它也能夠解放運維工程師,使他們不必再擔心任務的可用性和相關管理需求,隻通過輕松的增加服務節點即可達到自動化運維的目的。
ElasticJob
環境要求:
-
及其以上版本。Java 8
-
及其以上版本。Maven 3.5.0
-
及其以上版本。ZooKeeper 3.6.0
這裡先介紹子項目
ElasticJob-Lite
的使用,
ElasticJob‐Lite
的架構如下圖所示:
ElasticJob‐Lite
會将作業注冊到
ZooKeeper
上,在定義的命名空間下,建立作業名稱節點,用于區分不同作業,是以作業一旦建立則不能修改作業名稱,如果修改作業名稱将視為新的作業。
添加依賴(
3.0.1
是目前最新的
Releases
版本):
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>3.0.1</version>
</dependency>
的作業分類基于
ElasticJob
和
class
兩種類型。基于
type
的作業需要開發者自行通過實作接口的方式織入業務邏輯;基于
class
的作業則無需編碼,隻需要提供相應配置即可。基于
type
的作業接口的方法參數
class
包含作業配置、片和運作時資訊。可通過
shardingContext
、
getShardingTotalCount()
getShardingItem()
等方法分别擷取分片總數和運作在本作業伺服器的分片序列号等。
目前提供
ElasticJob
、
Simple
這兩種基于
Dataflow
的作業類型,并提供
class
、
Script
這兩種基于
HTTP
的作業類型,使用者可通過實作
type
接口自行擴充作業類型。
SPI
這篇部落格隻會介紹
Simple
作業,其他的作業類型以及擴充作業類型以後部落客也會給大家介紹。
Simple作業
定義
Simple
作業需要實作
SimpleJob
接口,而
SimpleJob
接口隻定義了一個方法,并且繼承了
ElasticJob
接口。
public interface SimpleJob extends ElasticJob {
void execute(ShardingContext shardingContext);
}
定義一個
Simple
作業,任務被分成三個可并行的子任務,而每個子任務隻是輸出目前時間和各自執行的任務分片。
package com.kaven.job;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author: ITKaven
* @Date: 2021/11/20 17:02
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class MySimpleJob implements SimpleJob {
private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0:
System.out.println(formatter.format(new Date()) + " : ShardingItem[0]");
break;
case 1:
System.out.println(formatter.format(new Date()) + " : ShardingItem[1]");
break;
case 2:
System.out.println(formatter.format(new Date()) + " : ShardingItem[2]");
break;
default:
System.out.println(formatter.format(new Date()) + " : Unknown ShardingItem");
}
}
}
定義了作業,還需要對作業進行配置,比如作業的名稱、分片數、
cron
時間表達式以及是否需要失效轉移等,主要通過
JobConfiguration
類來完成這些配置,它提供了建構者風格的方法,比如下面的作業配置,作業名稱為
MySimpleJob
、作業分片數為
3
,并且在每一分鐘的第
30
秒執行任務,調用
overwrite
方法用來設定在作業啟動時是否将本地配置覆寫到注冊中心(預設不覆寫,是以本地修改了
cron
時間表達式會不起作用),如果需要覆寫(方法傳入
true
),則每次啟動時都将使用本地配置(即以本地的作業配置為主,不然本地修改作業配置不會起作用)。調用
failover
方法用于設定是否開啟失效轉移(僅适用于開啟了
monitorExecution
,預設開啟
monitorExecution
,但預設不開啟失效轉移),
ElasticJob
不會在本次執行過程中進行重新分片(給作業節點配置設定作業分片),而是等待下次排程之前才開啟重新分片流程。當作業執行過程中伺服器當機,失效轉移允許将該次未完成的任務在另一作業節點上補償執行。
開啟失效轉移功能, ElasticJob
會監控作業每一分片的執行狀态,并将其寫入注冊中心,供其他節點感覺。在一次運作耗時較長且間隔較長的作業場景,失效轉移是提升作業運作實時性的有效手段;對于間隔較短的作業,會産生大量與注冊中心的網絡通信,對叢集的性能産生影響。而且間隔較短的作業并未見得關注單次作業的實時性,可以通過下次作業執行的重分片使所有的分片正确執行,是以不建議短間隔作業開啟失效轉移。 另外需要注意的是,作業本身的幂等性,是保證失效轉移正确性的前提。
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MySimpleJob", 3)
.cron("30 * * * * ?")
.overwrite(true)
.failover(true)
.build();
}
作業節點一:
作業節點二(當作業節點三當機後,會補償執行作業節點三未執行的作業分片):
作業節點三(模拟當機):
接下來還需要連接配接注冊中心(使用
ZooKeeper
),目前預設隻支援
ZooKeeper
作為注冊中心(
CoordinatorRegistryCenter
接口隻有
ZookeeperRegistryCenter
一個實作類)。
192.168.1.184:9000
是
ZooKeeper
服務端提供的用戶端連接配接套接字,而
my-job
就是作業的命名空間。
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
作業的各種配置和注冊中心已經準備好了,就可以啟動作業了,通過
ScheduleJobBootstrap
類來實作,主要就是将注冊中心、作業定義以及作業配置傳入啟動作業的執行個體中。
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(),
createJobConfiguration()).schedule();
}
彙總到
Application
類:
package com.kaven.job;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
/**
* @Author: ITKaven
* @Date: 2021/11/20 17:05
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(),
createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MySimpleJob", 3)
.cron("30 * * * * ?")
.overwrite(true)
.failover(true)
.build();
}
}
結果如下圖所示:
- cron時間表達式簡介
- Cron表達式詳解