文章目錄
- 1.業務場景與任務排程
- 2.任務排程的基本實作
-
- 2.1 多線程方式實作
- 2.2 Timer方式實作
- 2.3 ScheduledExecutor方式實作
- 2.4 第三方Quartz方式實作
- 3.分布式任務排程
- 4.XXL-JOB介紹
- 5.搭建XXL-JOB —— 排程中心
-
- 5.1 下載下傳與檢視XXL-JOB
- 5.2 建立資料庫表
- 5.3 修改預設的配置資訊
- 5.4 啟動服務程式
- 5.5 通路排程中心控制台
- 6.搭建XXL-JOB —— 執行器
-
- 6.1 pom.xml核心配置
- 6.2 application.yaml核心配置
- 6.3 XxlJobConfig配置類
- 6.4 XxlJobDemoApplication啟動類
- 6.5 進入排程中心添加執行器
- 7.搭建XXL-JOB —— 執行任務
-
- 7.1 簡單任務示例(Bean模式)
-
- 7.1.1 編寫任務方法
- 7.1.2 在排程中心進行任務管理
- 7.2 排程政策
- 7.3 分片廣播
-
- 7.3.1 編寫任務方法
- 7.3.2 增加一個節點服務
- 7.3.3 排程中心-執行器管理
- 7.3.4 排程中心-新增與啟用任務
- 7.3.5 校驗任務
- 7.4 進階配置說明
-
- 7.4.1 子任務
- 7.4.2 排程過期政策
- 7.4.3 阻塞處理政策
- 7.4.4 任務逾時時間
- 7.4.5 失敗重試次數
- 8.作業分片方案
- 9.三個經典面試題
-
- 9.1 xxl-jobo是怎麼工作的?
- 9.2 如何保證任務不重複執行?
- 9.3 如何保證任務處理的幂等性?
❓ 如何去高效處理一批任務
分布式任務排程的處理方案:分布式加多線程,充分利用多台計算機,每台計算機使用多線程處理。
1.業務場景與任務排程
我們可以先思考一下下面業務場景的解決方案:
- 某電商系統需要在每天上午10點,下午3點,晚上8點發放一批優惠券。
- 某财務系統需要在每天上午10點前結算前一天的賬單資料,統計彙總。
- 某電商平台每天淩晨3點,要對訂單中的無效訂單進行清理。
- 12306網站會根據車次不同,設定幾個時間點分批次放票。
- 電商整點搶購,商品價格某天上午8點整開始優惠。
- 商品成功發貨後,需要向客戶發送短信提醒。
類似的場景還有很多,我們該如何實作?以上這些場景,就是任務排程所需要解決的問題。
📖 任務排程顧名思義,就是對任務的排程,它是指系統為了完成特定業務,基于給定時間點,給定時間間隔或者給定執行次數自動執行任務。
2.任務排程的基本實作
2.1 多線程方式實作
我們可以開啟一個線程,每sleep一段時間,就去檢查是否已到預期執行時間。
以下代碼簡單實作了任務排程的功能:
/**
* @author 狐狸半面添
* @create 2023-02-16 13:15
*/
public class ThreadTaskDemo {
public static void main(String[] args) {
// 指定任務執行間隔時間(機關:ms)
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
public void run() {
while (true) {
// TODO 需要執行的任務
System.out.println("多線程方式任務排程:每隔1s執行一次任務");
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
// 線程執行,開啟定時任務
thread.start();
}
}
上面的代碼實作了按一定的間隔時間執行任務排程的功能。
Jdk也為我們提供了相關支援,如Timer、ScheduledExecutor,如下👇
2.2 Timer方式實作
Timer
的優點在于簡單易用,每個Timer對應一個線程,是以可以同時啟動多個Timer并行執行多個任務,同一個Timer中的任務是串行執行。
import java.util.Timer;
import java.util.TimerTask;
/**
* @author 狐狸半面添
* @create 2023-02-17 15:18
*/
public class TimerTaskDemo {
public static void main(String[] args) {
Timer timer = new Timer();
// 1秒後開始任務排程,每2秒執行一次任務
timer.schedule(new TimerTask() {
@Override
public void run() {
// TODO 需要執行的任務
System.out.println("Timer方式任務排程:每隔2s執行一次任務");
}
}, 1000, 2000);
}
}
2.3 ScheduledExecutor方式實作
Java 5 推出了基于線程池設計的
ScheduledExecutor
,其設計思想是,每一個被排程的任務都會由線程池中一個線程去執行,是以任務是并發執行的,互相之間不會受到幹擾。需要注意的是,隻有當任務的執行時間到來時,ScheduedExecutor 才會真正啟動一個線程,其餘時間 ScheduledExecutor 都是在輪詢任務的狀态。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author 狐狸半面添
* @create 2023-02-17 15:22
*/
public class ScheduledExecutorTaskDemo {
/**
* 設定線程池的線程數量
*/
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
public static void main(String[] args) {
// 第一個任務排程
executor.scheduleAtFixedRate(
// 可以使用匿名内部類方式建立一個Runnable實作類,也可以new一個類實作Runnable接口
new Runnable() {
@Override
public void run() {
// todo 需要執行的任務
System.out.println("任務一 定時排程中");
}
},
// 0秒後開始任務排程,每隔1秒執行一次任務
0, 1, TimeUnit.SECONDS
);
// 第二個任務排程
executor.scheduleAtFixedRate(
// 可以使用匿名内部類方式建立一個Runnable實作類,也可以new一個類實作Runnable接口
new Task(),
// 500毫秒後開始任務排程,每隔2000毫秒執行一次任務
500, 2000, TimeUnit.MILLISECONDS
);
}
static class Task implements Runnable {
@Override
public void run() {
// todo 需要執行的任務
System.out.println("任務二 定時排程中");
}
}
}
2.4 第三方Quartz方式實作
Timer
和
ScheduledExecutor
都僅能提供基于開始時間與重複間隔的任務排程,對于比較複雜的排程需求,比如,設定每月第一天淩晨1點執行任務、複雜排程任務的管理、任務間傳遞資料等等,實作起來比較麻煩。
Quartz
是一個功能強大的任務排程架構,它可以滿足更多更複雜的排程需求,Quartz 設計的核心類包括 Scheduler, Job 以及 Trigger。其中,Job 負責定義需要執行的任務,Trigger 負責設定排程政策,Scheduler 将二者組裝在一起,并觸發任務開始執行。Quartz支援簡單的按時間間隔排程、還支援按月曆排程方式,通過設定CronTrigger表達式(包括:秒、分、時、日、月、周、年)進行任務排程。
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
/**
* @author 狐狸半面添
* @create 2023-02-17 15:47
*/
public class QuartzDemo {
public static void main(String[] agrs) throws SchedulerException {
// 建立一個Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
// 建立JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName", "jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
// 建立觸發的CronTrigger 支援按月曆排程
CronTrigger trigger = (CronTrigger) TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow()
// 每隔兩秒執行一次
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();
//建立觸發的SimpleTrigger 簡單的間隔排程
/*
SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","triggerGroupName")
.startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();
*/
scheduler.scheduleJob(jobDetail, (Trigger) trigger);
scheduler.start();
}
public static class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) {
// todo 需要定時排程的任務
System.out.println("定時任務正在排程執行");
}
}
}
3.分布式任務排程
通常任務排程的程式是內建在應用中的,比如:優惠卷服務中包括了定時發放優惠卷的的排程程式,結算服務中包括了定期生成報表的任務排程程式,由于采用分布式架構,一個服務往往會部署多個備援執行個體來運作我們的業務,在這種分布式系統環境下運作任務排程,我們稱之為分布式任務排程,如下圖:
🚩 分布式排程要實作的目标:
不管是任務排程程式內建在應用程式中,還是單獨建構的任務排程系統,如果采用分布式排程任務的方式就相當于将任務排程程式分布式建構,這樣就可以具有分布式系統的特點,并且提高任務的排程處理能力:
-
并行任務排程
:并行任務排程實作靠多線程,如果有大量任務需要排程,此時光靠多線程就會有瓶頸了,因為一台計算機CPU的處理能力是有限的。
如果将任務排程程式分布式部署,每個結點還可以部署為叢集,這樣就可以讓多台計算機共同去完成任務排程,我們可以将任務分割為若幹個分片,由不同的執行個體并行執行,來提高任務排程的處理效率。
-
:若某一個執行個體當機,不影響其他執行個體來執行任務。高可用
-
:當叢集中增加執行個體就可以提高并執行任務的處理效率。彈性擴容
-
:對系統中存在的所有定時任務進行統一的管理及監測。讓開發人員及運維人員能夠時刻了解任務執行情況,進而做出快速的應急處理響應。任務管理與監測
-
:當任務排程以叢集方式部署,同一個任務排程可能會執行多次,比如在上面提到的電商系統中到點發優惠券的例子,就會發放多次優惠券,對公司造成很多損失,是以我們需要控制相同的任務在多個運作執行個體上隻執行一次。避免任務重複執行
4.XXL-JOB介紹
XXL-JOB
是一個輕量級分布式任務排程平台,其核心設計目标是開發迅速、學習簡單、輕量級、易擴充。現已開放源代碼并接入多家公司線上産品線,開箱即用。
🏠 官網:https://www.xuxueli.com/xxl-job/
📖 文檔:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B
XXL-JOB主要有
排程中心
、
執行器
、
任務
:
🍀 排程中心:負責管理排程資訊,按照排程配置發出排程請求,自身不承擔業務代碼。主要職責為執行器管理、任務管理、監控運維、日志管理等。
🍀 任務執行器:負責接收排程請求并執行任務邏輯。隻要職責是注冊服務、任務執行服務(接收到任務後會放入線程池中的任務隊列)、執行結果上報、日志服務等。
🍀 任務:負責執行具體的業務處理。
🚩 排程中心與執行器之間的工作流程如下:
📍 執行流程:
- 任務執行器根據配置的排程中心的位址,自動注冊到排程中心。
- 達到任務觸發條件,排程中心下發任務
- 執行器基于線程池執行任務,并把執行結果放入記憶體隊列中、把執行日志寫入日志檔案中
- 執行器消費記憶體隊列中的執行結果,主動上報給排程中心
- 當使用者在排程中心檢視任務日志,排程中心請求任務執行器,任務執行器讀取任務日志檔案并傳回日志詳情
5.搭建XXL-JOB —— 排程中心
5.1 下載下傳與檢視XXL-JOB
🏠 下載下傳 XXL-JOB:
- GitHub:https://github.com/xuxueli/xxl-job
- 碼雲:https://gitee.com/xuxueli0323/xxl-job
我們這裡使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
使用IDEA打開解壓後的目錄:
-
:排程中心xxl-job-admin
-
:公共依賴xxl-job-core
-
:執行器Sample示例(選擇合适的版本執行器,可直接使用)xxl-job-executor-samples
- xxl-job-executor-sample-springboot:Springboot版本,通過Springboot管理執行器,推薦這種方式;
- xxl-job-executor-sample-frameless:無架構版本;
-
:文檔資料,包含資料庫腳本doc
5.2 建立資料庫表
打開腳本,全選執行即可。
⚠️ 注意事項:
之後我們在通路排程中心時,需要登入使用者名和密碼,預設為:
- 使用者名:admin
- 密碼:123456
這個資訊在資料庫的
xxl_job_user
進行儲存和登入驗證:
5.3 修改預設的配置資訊
5.4 啟動服務程式
5.5 通路排程中心控制台
🏠 http://127.0.0.1:9401/xxl-job-admin/
6.搭建XXL-JOB —— 執行器
下邊配置執行器,執行器負責與排程中心通信接收排程中心發起的任務排程請求。
這裡為了友善示範,我們創一個新的空maven項目充當執行器進行示範:
6.1 pom.xml核心配置
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
6.2 application.yaml核心配置
server:
# 指定服務端口
port: 10001
xxl:
job:
admin:
# 排程中心的部署位址。若排程中心采用叢集部署,存在多個位址,則用逗号分隔。執行器将會使用該位址進行”執行器心跳注冊”和”任務結果回調”。
addresses: http://localhost:9401/xxl-job-admin
executor:
# 執行器的應用名稱,它是執行器心跳注冊的分組依據。
appname: demo-process-service
address:
# 執行器的IP位址,用于”排程中心請求并觸發任務”和”執行器注冊”。執行器IP預設為空,表示自動擷取IP。多網卡時可手動設定指定IP,手動設定IP時将會綁定Host。
ip:
# 執行器的端口号,預設值為9999。單機部署多個執行器時,注意要配置不同的執行器端口。排程中心需要從執行器拉取日志,指定排程中心通路本執行器的端口。
port: 60000
# 執行器輸出的日志檔案的存儲路徑,需要擁有該路徑的讀寫權限。
logpath: /data/applogs/xxl-job/jobhandler
# 執行器日志檔案的定期清理功能,指定日志儲存天數,日志檔案過期自動删除。限制至少儲存3天,否則功能不生效。這裡指定為30天。
logretentiondays: 30
# 執行器的通信令牌,非空時啟用。
accessToken: default_token
注意配置中的appname這是執行器的應用名,稍後在排程中心配置執行器時要使用。
6.3 XxlJobConfig配置類
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 針對多網卡、容器内部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 元件靈活定制注冊IP;
*
* 1、引入依賴:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置檔案,或者容器啟動變量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、擷取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
6.4 XxlJobDemoApplication啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author 狐狸半面添
* @create 2023-02-17 17:19
*/
@SpringBootApplication
public class XxlJobDemoApplication {
public static void main(String[] args) {
SpringApplication.run(XxlJobDemoApplication.class);
}
}
6.5 進入排程中心添加執行器
先啟動排程中心服務和執行器服務。
-
:需要從application.yaml中拿到AppName
-
:可以自定義名稱
-
:選擇 自動注冊注冊方式
我們需要再重新整理一下頁面,就可以看到線上的執行器的機器位址:
7.搭建XXL-JOB —— 執行任務
先編寫一個任務類:
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* XxlJob開發示例(Bean模式)
* 開發步驟:
* 1、任務開發:在Spring Bean執行個體中,開發Job方法;
* 2、注解配置:為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應的是排程中心建立任務的JobHandler屬性的值。
* 3、執行日志:需要通過 "XxlJobHelper.log" 列印執行日志;
* 4、任務結果:預設任務結果為 "成功" 狀态,不需要主動設定;如有訴求,比如設定任務結果為失敗,可以通過 "XxlJobHelper.handleFail/handleSuccess" 自主設定任務結果;
*
* @author 狐狸半面添
* @create 2023-02-17 17:41
*/
@Component
public class XxlJobDemo {
}
7.1 簡單任務示例(Bean模式)
7.1.1 編寫任務方法
package com.xxl.demo.component;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* XxlJob開發示例(Bean模式)
* 開發步驟:
* 1、任務開發:在Spring Bean執行個體中,開發Job方法;
* 2、注解配置:為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應的是排程中心建立任務的JobHandler屬性的值。
* 3、執行日志:需要通過 "XxlJobHelper.log" 列印執行日志;
* 4、任務結果:預設任務結果為 "成功" 狀态,不需要主動設定;如有訴求,比如設定任務結果為失敗,可以通過 "XxlJobHelper.handleFail/handleSuccess" 自主設定任務結果;
*
* @author 狐狸半面添
* @create 2023-02-17 17:41
*/
@Component
public class XxlJobDemo {
/**
* 簡單任務示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
// 列印日志
XxlJobHelper.log("簡單任務示例方法執行");
System.out.println("==================================");
for (int i = 0; i < 5; i++) {
System.out.println("第" + i + "次循環");
TimeUnit.MILLISECONDS.sleep(500);
}
// default success
}
}
7.1.2 在排程中心進行任務管理
記得先将服務重新開機。
排程類型選擇
Cron
,并配置Cron表達式設定定時政策。Cron表達式是一個字元串,通過它可以定義排程政策,格式如下:
{秒數} {分鐘} {小時} {日期} {月份} {星期} {年份(可為空)}
xxl-job提供圖形界面去配置:
📝 一些例子如下:
- 30 10 1 * * ? 每天1點10分30秒觸發
- 0/30 * * * * ? 每30秒觸發一次
- 0 0/10 * * * ? 每10分鐘觸發一次
運作模式有
BEAN
和
GLUE
,bean模式較常用就是在項目工程中編寫執行器的任務代碼,GLUE是将任務代碼編寫在排程中心。
JobHandler
任務方法名填寫
@XxlJob
注解中的名稱。
新增成功,就啟動任務:
檢視Java控制台:
檢視日志:
任務跑一段時間注意清理日志:
如果要停止任務需要在排程中心操作:
7.2 排程政策
執行器在叢集部署下排程中心有哪些排程政策呢?檢視xxl-job官方文檔,閱讀進階配置相關的内容:
路由政策:當執行器叢集部署時,提供豐富的路由政策,包括:
-
:每次排程選擇叢集中第一台執行器。FIRST(第一個)
-
:每次排程選擇叢集中最後一台執行器。LAST(最後一個)
-
:按照順序每次排程選擇一台執行器去排程。ROUND(輪詢)
-
:每次排程随機選擇一台執行器去排程。RANDOM(随機)
-
:每個任務按照Hash算法固定選擇某一台機器,且所有任務均勻散列在不同機器上。CONSISTENT_HASH(一緻性HASH)
-
:使用頻率最低的機器優先被選舉。LEAST_FREQUENTLY_USED(最不經常使用)
-
:最久未使用的機器優先被選舉。LEAST_RECENTLY_USED(最近最久未使用)
-
:按照順序依次進行心跳檢測,第一個心跳檢測成功的機器標明為目标執行器并發起排程。FAILOVER(故障轉移)
-
:按照順序依次進行空閑檢測,第一個空閑檢測成功的機器標明為目标執行器并發起排程。BUSYOVER(忙碌轉移)
-
:廣播觸發對應叢集中所有機器執行一次任務,同時系統自動傳遞分片參數;可根據分片參數開發分片任務。SHARDING_BROADCAST(分片廣播)
7.3 分片廣播
我們思考一下如何進行分布式任務處理呢?如下圖,我們會啟動多個執行器組成一個叢集,去執行任務。
分片廣播政策:分片是指是排程中心将叢集中的執行器标上序号:0,1,2,3…,廣播是指每次排程會向叢集中所有執行器發送排程請求,請求中攜帶分片參數。
每個執行器收到排程請求根據分片參數自行決定是否執行任務。
另外xxl-job還支援動态分片,當執行器數量有變更時,排程中心會動态修改分片的數量。
📍 作業分片适用哪些場景呢?
-
:10個執行器的叢集來處理10w條資料,每台機器隻需要處理1w條資料,耗時降低10倍;分片任務場景
-
:廣播執行器同時運作shell腳本、廣播叢集節點進行緩存更新等。廣播任務場景
是以,廣播分片方式不僅可以充分發揮每個執行器的能力,并且根據分片參數可以控制任務是否執行,最終靈活控制了執行器叢集分布式處理任務。
💬 “分片廣播” 和普通任務開發流程一緻,不同之處在于可以擷取分片參數進行分片業務處理。
7.3.1 編寫任務方法
/**
* 分片廣播任務
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
/*
分片參數:
- shardIndex:分片序号
- shardTotal:分片總數
*/
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
System.out.printf("分片參數:目前分片序号 = %d, 總分片數 = %d\n", shardIndex, shardTotal);
XxlJobHelper.log("分片參數:目前分片序号 = {}, 總分片數 = {}", shardIndex, shardTotal);
// todo 業務邏輯
}
7.3.2 增加一個節點服務
修改新節點的服務端口和執行器通路端口資訊:
-Dserver.port=10002 -Dxxl.job.executor.port=60001
啟動這兩個服務:
7.3.3 排程中心-執行器管理
上圖說明在排程中心已經注冊成功。
7.3.4 排程中心-新增與啟用任務
7.3.5 校驗任務
7.4 進階配置說明
7.4.1 子任務
每個任務都擁有一個唯一的任務ID(任務ID可以從任務清單擷取),當本任務執行結束并且執行成功時,将會觸發子任務ID所對應的任務的一次主動排程,通過子任務可以實作一個任務執行完成去執行另一個任務。
7.4.2 排程過期政策
-
:排程過期後,忽略過期的任務,從目前時間開始重新計算下次觸發時間。忽略
-
:排程過期後,立即執行一次,并從目前時間開始重新計算下次觸發時間。立即執行一次
7.4.3 阻塞處理政策
排程過于密集執行器來不及處理時的處理政策
-
:排程請求進入單機執行器後,排程請求進入FIFO隊列并以串行方式運作。單機串行(預設)
-
:排程請求進入單機執行器後,發現執行器存在運作的排程任務,本次請求将會被丢棄并标記為失敗。丢棄後續排程
-
:排程請求進入單機執行器後,發現執行器存在運作的排程任務,将會終止運作中的排程任務并清空隊列,然後運作本地排程任務。覆寫之前排程
7.4.4 任務逾時時間
支援自定義任務逾時時間,任務運作逾時将會主動中斷任務。
7.4.5 失敗重試次數
支援自定義任務失敗重試次數,當任務失敗時将會按照預設的失敗重試次數主動進行重試。
8.作業分片方案
❓ 當一次分片廣播到來,各執行器如何根據分片參數去分布式執行任務,保證執行器之間執行的任務不重複呢?
執行器收到排程請求後各自己查詢屬于自己的任務,這樣就保證了執行器之間不會重複執行任務。
xxl-job設計作業分片就是為了分布式執行任務,XXL-JOB并不直接提供資料處理的功能,它隻會給執行器配置設定好分片序号并向執行器傳遞
分片總數
、
分片序号
這些參數,開發者需要自行處理分片項與真實資料的對應關系。
每個執行器收到廣播任務有兩個參數:分片總數、分片序号。每個執行從資料表取任務時可以讓任務id 模上 分片總數,如果等于分片序号則執行此任務。
上邊兩個執行器執行個體那麼分片總數為2,序号為0、1,從任務1開始,如下:
- 1 % 2 = 1 執行器2執行
- 2 % 2 = 0 執行器1執行
- 3 % 2 = 1 執行器2執行
- 以此類推
9.三個經典面試題
9.1 xxl-jobo是怎麼工作的?
XXL-JOB分布式任務排程服務由調用中心和執行器組成,調用中心負責按任務排程政策向執行器下發任務,執行器負責接收任務執行任務。
- 首先部署并啟動xxl-job排程中心。(一個java工程)
- 首先在微服務添加xxl-job依賴,在微服務中配置執行器
- 啟動微服務,執行器向排程中心上報自己.
- 在微服務中寫一個任務方法并用xxl-job的注解去标記執行任務的方法名稱。
- 在排程中心配置任務排程政策,排程政策就是每隔多長時間執行還是在每天或每月的固定時間去執行,比如每天0點執行,或每隔1小時執行一次等
- 在排程中心啟動任務。
- 排程中心根據任務排程政策,到達時間就開始下發任務給執行器。、
- 執行器收到任務就開始執行任務。
9.2 如何保證任務不重複執行?
- 排程中心按分片廣播的方式去下發任務。
- 執行器收到作業分片廣播的參數:分片總數和分月序号,計算任務id除以分片總數得到一個餘數,如果餘數等于分片序号這時就去執行這個任務,這裡保證了不同的執行器執行不同的任務。
- 配置
為"排程過期政策
”,避免同一個執行器多次重複執行同一個任務。忽略
忽略:排程過期後,忽略過期的任務,從目前時間開始重新計算下次觸發時間。
- 配置
為“任務阻塞處理政策
”,注意:棄也沒事下一次排程就又可以執行了。丢棄後續排程
丢棄後續排程:排程請求進入單機執行器後,發現執行器存在運作的排程任務,本次請求将會被丢棄并标記為失敗。
- 另外還要保證任務處理的幂等性,執行過的任務可以打一個狀态标記已完成,下次再排程執行該任務判斷該任務已完成就不再執行。
9.3 如何保證任務處理的幂等性?
任務的幂等性是指:對于資料的操作不論多少次,操作的結果始終是一緻的。執行器接收排程請求去執行任務,要有辦法去判斷該任務是否處理完成,如果處理完則不再處理,即使重複排程處理相同的任務也不能重複處理已經處理過的資料。
幂等性描述了一次和多次請求某一個資源對于資源本身應該具有同樣的結果。
幂等性是為了解決重複送出問題,比如:惡意刷單,重複支付等。
📍 解決幂等性常用的方案:
- 資料庫限制,比如:唯一索引,主鍵。
- 樂觀鎖,常用于資料庫,更新資料時根據樂觀鎖狀态去更新。
- 唯一序列号,請求前生成唯一的序列号,攜帶序列号去請求,操作時先判斷與該序列号是否相等。不相等則說明已經執行過了就不再執行,否則執行并且修改序列号或删除。
例如在資料庫中我們對于操作過的記錄修改字段
的值來表示已經操作。status