spring batch step詳解
- Step 配置
- step 抽象與繼承
- step 執行攔截器
- step 攔截器定義
- step執行攔截器
- step組合攔截器
- step攔截器注解
- Tasklet 配置
- 重新開機Step
- 事務
- 事務復原
- 多線程Step
- 自定義Tasklet
- Chunk配置
- 送出間隔
- 異常跳過
- Step重試
- Chunk完成政策
- 讀、處理事務
- 攔截器
- ChunkListener
- ItemReadListener
- ItemProcessListener
- ItemWriteListener
- SkipListener
- RetryListener
github位址:
https://github.com/a18792721831/studybatch.git
文章清單:
spring batch 入門
spring batch連接配接資料庫
spring batch中繼資料
spring batch Job詳解
spring batch step詳解
spring batch ItemReader詳解
spring batch itemProcess詳解
spring batch itemWriter詳解
spring batch 作業流
spring batch 健壯性
spring batch 擴充性
Step 配置
Step表示作業中的一個完整的步驟,一個Job可以由一個或者多個Step組成。Step包含了一個實際運作的批處理任務中所有必需的資訊。
step,tasklet,chunk,read,process,write的關系如圖

Step屬性
Step的組成
step 抽象與繼承
step和job一樣,在最新的spring boot中都是bean的方式注入spring 容器中即可,是以和類的抽象,繼承相同。
public abstract class AbsStep extends TaskletStep{
protected Logger logger = LoggerFactory.getLogger(this.getClass());
public AbsStep() {
setTasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
int sum = Integer.valueOf(chunkContext.getStepContext().getJobParameters().get("time").toString());
while (sum > 0) {
System.out.println(chunkContext.getStepContext().getStepName() + " exec : " + LocalDateTime.now());
sum--;
}
return RepeatStatus.FINISHED;
}
});
}
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
beforeAbsExec(stepExecution);
logger.warn("abstract before real exec ");
super.doExecute(stepExecution);
logger.warn("abstract after real exec ");
afterAbsExec(stepExecution);
}
abstract void beforeAbsExec(StepExecution stepExecution);
abstract void afterAbsExec(StepExecution stepExecution);
}
接着建立實作類
@EnableBatchProcessing
public class PlayAStep extends AbsStep {
public PlayAStep() {
setName("study5-abs-play-a-step");
}
@Override
void beforeAbsExec(StepExecution stepExecution) {
logger.warn(" PlayAStep before exec ");
}
@Override
void afterAbsExec(StepExecution stepExecution) {
logger.warn(" PlayAStep after exec ");
}
}
建立另一個實作類
@EnableBatchProcessing
public class PlayBStep extends AbsStep {
public PlayBStep() {
setName("study5-abs-play-b-step");
}
@Override
void beforeAbsExec(StepExecution stepExecution) {
logger.warn(" PlayBStep before exec ");
}
@Override
void afterAbsExec(StepExecution stepExecution) {
logger.warn(" PlayBStep after exec ");
}
}
有一點需要注意,step和job不同的地方,step不能由spring容器負責建立和管理。
在官網的sample中 ,都是建議使用builder建立。在builder中,也是手動建立執行個體的:
建立完成後,并沒有交給spring容器管理。
如果我們建立了一個step,非要交給spring容器,那麼,在spring容器進行初始化處理的時候,會調用AbstractStep的afterPropertiesSet.在afterPropertiesSet中,要求名字和jobRepository不為空。
我們交給spring容器建立step執行個體的時候,無法保證在調用afterPropertiesSet方法之前注入jobRepository。而且,交給spring容器管理的bean一般都是單例的bean。雖然step可以複用,但是因參數等原因,還是沒有支援自動放入spring容器管理。而是寫了stepBuilder來建立。
基于這個原理,我們實作的抽象和繼承的step,也需要建立builder類用于手動建立。(這裡也可以使用xml配置,或者直接手動new執行個體,然後将name和jobRepository等屬性注入。)
@EnableBatchProcessing
@Component
public class StepBuilder {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
public AbsStep get(Class clazz) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
Object instance = clazz.getDeclaredConstructor().newInstance();
if (instance instanceof AbsStep) {
AbsStep step = (AbsStep) instance;
step.setJobRepository(jobRepository);
step.setTransactionManager(transactionManager);
return step;
} else{
throw new NoSuchMethodException("");
}
}
}
接着配置jobRepository
@Configuration
public class JobConfig {
private static final Logger logger = LoggerFactory.getLogger(JobConfig.class);
@Bean
@Autowired
public JobRepositoryFactoryBean jobRepositoryFactoryBean(DataSource dataSource, PlatformTransactionManager transactionManager) {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDataSource(dataSource);
try {
jobRepositoryFactoryBean.afterPropertiesSet();
} catch (Exception e) {
logger.error("create job repository factory bean error : {}", e.getMessage());
}
return jobRepositoryFactoryBean;
}
@Bean
@Primary
@Autowired
public JobRepository jobRepository(JobRepositoryFactoryBean jobRepositoryFactoryBean) {
JobRepository jobRepository = null;
try {
jobRepository = jobRepositoryFactoryBean.getObject();
} catch (Exception e) {
logger.error("create job repository error : {}", e.getMessage());
}
return jobRepository;
}
}
配置job并啟動
@EnableBatchProcessing
@Configuration
public class JobConf {
@Bean
public String runAbsJob(JobLauncher jobLauncher, Job absJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(absJob, new JobParametersBuilder()
.addLong("time", 10L)
.addDate("date", new Date())
.toJobParameters());
return "";
}
@Bean
public Job absJob(JobBuilderFactory jobBuilderFactory, StepBuilder stepBuilder) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
return jobBuilderFactory.get("study5-abs-job")
.start(stepBuilder.get(PlayAStep.class))
.next(stepBuilder.get(PlayBStep.class))
.validator(new DefaultJobParametersValidator(new String[]{"time"}, new String[]{}))
.build();
}
}
啟動
step 執行攔截器
step 攔截器定義
是一個标記性接口。
step執行攔截器
隻有兩個方法
實作自己的step執行攔截器
@Component
public class StepAListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println( stepExecution.getStepName() + " Step a Listener before ");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " Step a Listener after ");
return stepExecution.getExitStatus();
}
}
使用
@EnableBatchProcessing
@Configuration
public class StepLisJobConf {
@Bean
public String runJob(JobLauncher jobLauncher, Job lisJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(lisJob, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
return "";
}
@Bean
public Job lisJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, StepAListener stepAListener) {
return jobBuilderFactory.get("study5--job-step-listener")
.start(stepBuilderFactory.get("study5-step-step-listener")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println(chunkContext.getStepContext().getStepName() + "exec " + LocalDateTime.now());
return RepeatStatus.FINISHED;
}
}).listener(stepAListener)
.build()).build();
}
}
啟動
step組合攔截器
我們在實作一個攔截器
@Component
public class StepBListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " Step b Listener before ");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " Step b Listener after ");
return stepExecution.getExitStatus();
}
}
接着在配置step攔截器的時候,建立一個組合攔截器,并注冊到step上。
@EnableBatchProcessing
@Configuration
public class ComStepLisJobCOnf {
@Bean
public String runJob(JobLauncher jobLauncher,Job comLisJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(comLisJob, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
return "";
}
@Bean
public Job comLisJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
StepAListener stepAListener, StepBListener stepBListener) {
CompositeStepExecutionListener listener = new CompositeStepExecutionListener();
listener.register(stepAListener);
listener.register(stepBListener);
return jobBuilderFactory.get("study5-step-job-com-job")
.start(stepBuilderFactory.get("study5-step-step-com-lis")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println(chunkContext.getStepContext().getStepName() + " exec " + LocalDateTime.now());
return RepeatStatus.FINISHED;
}
}).listener(listener).build()).build();
}
}
啟動
step攔截器注解
得益于注解,在step的builder中重載了listener(Object)方法,是以,我們可以在一個普通bean上使用@BeforeStep和@AfterStep注解即可:
@Component
public class AnnoCListener {
@BeforeStep
public void before(StepExecution stepExecution){
System.out.println(stepExecution.getStepName() + " exec Anno Listener " + LocalDateTime.now());
}
@AfterStep
public ExitStatus after(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " exec Anno Listener " + LocalDateTime.now());
return stepExecution.getExitStatus();
}
}
然後使用
@EnableBatchProcessing
@Configuration
public class AnnoStepLisJobConf {
@Bean
public String runJob(JobLauncher jobLauncher, Job annoStepJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(annoStepJob, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
return "";
}
@Bean
public Job annoStepJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,AnnoStepLisJobConf annoStepLisJobConf){
return jobBuilderFactory.get("study5-anno-listener-job")
.start(stepBuilderFactory.get("study5-anno-listener-step")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println(chunkContext.getStepContext().getStepName() + " exec " + LocalDateTime.now());
return RepeatStatus.FINISHED;
}
}).listener(annoStepLisJobConf).build()).build();
}
}
啟動
需要注意的是,組合攔截器可能不支援注解方式配置的攔截器。
Tasklet 配置
tasklet元素定義任務的具體執行邏輯,執行邏輯可以自定義實作,也可以使用spring batch的chunk操作,提供了标準的讀、處理、寫三步操作。通過tasklet元素同樣可以定義事務、處理線程、啟動控制、復原控制和攔截器等。
tasklet屬性說明
tasklet下級配置
重新開機Step
批處理作業架構需要支援任務重新啟動,批處理作業處理資料發生錯誤的時候,在資料修複後需要能夠将目前的任務執行個體執行完畢。spring batch架構支援狀态為非"COMPELETED"的job執行個體重新啟動,Job執行個體重新開機時,會從目前失敗的step重新開始執行.
重新開機次數
預設Step是可以重新開機的,重新開機次數由start-limit控制,預設是最大值
重新開機已完成
預設情況下,已經執行完成的step不需要重新啟動,但是,在一些特殊場景下,為了保證業務操作的完整 性,需要重新啟動已經完成的step。此時就需要配置allow-start-if-complete屬性。
事務
spring batch架構提供了事務能力保障Job可靠的執行,能夠将Job的read,process和write三者有效的控制在一起,保證操作的完整性;Job執行期間的中繼資料狀态的持久化同樣依賴事務的保證。在Job執行期間,可以配置事務管理器、事務的基本屬性(包括隔離級别、傳播方式、事務逾時等資訊)
事務管理器
spring batch如果需要使用事務,需要在spring容器中聲明管理器。
在spring boot 的starter中,預設有一個事務管理器。
我們在配置JobRepository時,就需要用到事務管理器,使用的是預設的事務管理器。
當然,我們也可以定義自己的事務管理器:
然後在配置Step的時候使用
也就是說,我們可以為不同的step配置不同的事務。
很明顯的,可以為不同的step配置不同的事務,那麼,順帶就可以配置不同的資料庫。
事務屬性
事務屬性一般有兩個:隔離級别和傳播方式。
事務的隔離級别
事務的傳播方式
傳播方式 | 說明 |
REQUIRED | 支援目前事務,如果目前沒有事務,就建立一個事務 |
SUPPORTS | 支援目前事務,如果目前沒有事務,就以非事務方式執行 |
MANDATORY | 支援目前事務,如果目前沒有事務,就抛出異常 |
REQUIRES_NEW | 建立事務,如果目前存在事務,則把目前事務挂起 |
NOT_SUPPORTED | 以非事務方式執行操作,如果目前存在事務,就把目前事務挂起 |
NEVER | 以非事務方式執行,如果目前存在事務,則抛出異常 |
比如我們可以建立事務的屬性的bean,然後在配置step的時候使用
需要注意一點,這裡不能直接配spring的名字,在spring batch中做了一層封裝
必須用spring batch封裝的變量
事務復原
通過事務控制可以較好的保證事務任務的執行。在業務處理過程中,包括讀、寫、處理資料,如果發生了異常會導緻事務復原,spring batch架構提供了發生特定異常不觸發事務復原的能力。
需要注意,這是chunk的特性,使用tasklet直接注入是無法使用的,而且需要調用構造器的faultTolerant來選擇進階特性的構造器,然後使用skip方法指定特定的異常,不進行復原操作,而是跳過這些異常。
多線程Step
Job執行預設情況使用單個線程完成任務的執行。spring batch架構支援為step配置多個線程,即可以使用多個線程并行執行一個step,可以提高step的處理速度。
自定義Tasklet
自定義的tasklet非常簡單,直接實作tasklet接口即可。在前面的例子中我們基本上都是使用自定義的tasklet的,以内部類的方式使用。
tasklet接口隻有一個execute方法需要實作,傳回狀态表示是否執行完畢,如果傳回的狀态不是執行完畢,那麼會重複執行,直到執行完畢,或者傳回null。
空值也表示執行完畢
當然spring batch也提供了一些tasklet接口的實作類,供我們直接或者間接使用
我們以
CallableTaskletAdapter
為例,體驗一把
@EnableBatchProcessing
@Configuration
public class CallTaskJobConf {
@Bean
public String runJob(JobLauncher jobLauncher,Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(job, new JobParameters());
return "";
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,Step step) {
return jobBuilderFactory.get("call-tasklet-job")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("callable-tasklet-step")
.tasklet(callTasklet())
.build();
}
private Tasklet callTasklet() {
CallableTaskletAdapter adapter = new CallableTaskletAdapter();
Callable<RepeatStatus> callable = () -> {
System.out.println("call");
return RepeatStatus.FINISHED;
};
adapter.setCallable(callable);
return adapter;
}
}
建立的callable裡面隻列印了一句話
call
然後我們執行,看看會不會輸出這句話
Chunk配置
chunk才是spring batch提倡使用的。前面的tasklet裡面是沒有任何規則,将全部的業務代碼塞進去,執行就行。而spring batch架構提倡的批處理操作被抽象為讀、處理、寫這樣三個基本邏輯。同時,為了支援各種場景,spring batch提供了各種各樣的讀寫元件,包括格式化檔案的讀寫,xml檔案,資料庫,jms消息讀寫等。
chunk除了讀寫處理這三個基本操作外,還有一些進階特性,比如異常處理,批處理的可靠性、穩定性、異常重入的能力。還有事務送出間隔、跳過政策、重試政策、讀事務隊列、處理完成政策等。
常見的操作包含這些
送出間隔
在spring batch的hello world程式中,使用chunk需要設定一個送出數量,也就是送出間隔。頻繁的送出會降低資料庫的性能,是以,批量送出是一個好的選擇。
在面向批處理chunk的操作中,可以通過屬性commit-interval設定read多少條記錄後進行一次送出。通過設定commit-interval的間隔值,減少送出頻次,提升資源使用率。
比如:
@EnableBatchProcessing
@Configuration
public class ChunkTimeJobConf {
@Bean
public String runJob(JobLauncher jobLauncher,Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(job, new JobParameters());
return "";
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,Step step) {
return jobBuilderFactory.get("chunk-time-job")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("chunk-time-step")
.<Long,Long>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
private ItemProcessor<Long,Long> processor(){
return new ItemProcessor<Long, Long>() {
@Override
public Long process(Long item) throws Exception {
return item;
}
};
}
private ItemWriter<Long> writer() {
return new ItemWriter<Long>() {
@Override
public void write(List<? extends Long> items) throws Exception {
System.out.println(items.size());
}
};
}
private ItemReader<Long> reader(){
List<Long> longs = new ArrayList<>(1000);
for (long i = 0; i < 1000; i++) {
longs.add(i);
}
return new ListItemReader<Long>(longs);
}
}
我們建立了1000個數字,然後什麼都不做,接着調用寫,在寫資料的方法中列印傳輸的數量,在定義step的時候,我們定義使用chunk,而且每100個資料寫一次,這樣的話,預期共寫10次。
執行結果如下
異常跳過
在進行批處理的時候,我們無法100%保證資料全部正确,總是會有異常資料的。如果處理1000W的資料,結果因為1個資料異常,造成整個job失敗,進而導緻重新執行整個job,這個代價就太大了。不能因為一個資料,就否定全部的工作,不能一顆老鼠屎,壞了整個米倉。
是以,對于異常的資料,支援跳過。
@EnableBatchProcessing
@Configuration
public class ChunkSkipJobConf {
@Bean
public String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(job, new JobParameters());
return "";
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("chunk-skip-job")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("chunk-skip-step")
.<Long,Long>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skip(LessZoreException.class)
.skipLimit(1000)
.build();
}
private ItemProcessor<Long,Long> processor(){
return new ItemProcessor<Long, Long>() {
@Override
public Long process(Long item) throws Exception {
// 如果數字是負數,抛出異常
if (item < 0) {
throw new LessZoreException(item + " < 0 ");
}
return item;
}
};
}
private ItemWriter<Long> writer() {
return new ItemWriter<Long>() {
@Override
public void write(List<? extends Long> items) throws Exception {
System.out.println(items.size());
}
};
}
private ItemReader<Long> reader(){
List<Long> longs = new ArrayList<>(1000);
for (long i = 0; i < 1000; i++) {
// 如果是9的倍數,那麼設定為負數
if (i%9 == 0) {
longs.add(i*-1);
} else {
longs.add(i);
}
}
return new ListItemReader<Long>(longs);
}
}
我們還是1000個數字,在讀取數字的時候,如果是9的倍數,那麼将數字變為負數。在處理的時候,如果數字小于0,那麼抛出我們自定義的小于0異常:
寫入還是隻列印傳輸的數量。
需要注意一點,如果我們設定了跳過的異常,但是沒有設定最多允許跳過的數量,還是會抛出異常。
現在有一個問題:1000内有多少個9的倍數?我也懶得算,直接百度
111個。
那麼1000 - 111 = 889個寫入資料,每100個寫入一次,總共應該寫入9次,最後一次寫入89個。
和我們預期的一樣嗎?
試試
哦哦,和我們的預期不一樣。其實是chunk的了解不對,chunk是以read的數量進行計數,而不是write數量進行計數的。
為什麼最後一個是88個?
别忘記了0
0%9還是0
如果異常數量超過允許的最大值,也會抛出異常的。
将上面的例子中允許異常的數量調整為100.現在已知會抛出112個異常,但是我們允許的是100.别忘記修改名字或者參數,已經執行完的job_instance不會重複執行。
抛出了異常,異常資訊提示,跳過的數量超出允許的值100.
Step重試
step執行期間read,process,write發生的任何異常都會導緻step執行失敗,進而導緻作業的失敗。批處理作業的自動化、定時觸發,有特定的執行時間視窗特性,決定了盡可能地減少Job的失敗。處理任務階段發生的異常可以讓業務失敗,也可以通過skip的設定,跳過部分異常;但是也有一些異常,并不是必現的,比如網絡異常,現在失敗了,下一次就可能成功了。是以這類異常的出現可能在下次重新操作的時候消失,資料庫鎖的異常在下次送出的時候,可能就釋放了。對于這些場景,我們并不希望作業失敗,也不希望直接跳過,而是重試幾次。盡可能讓job成功。
spring batch架構提供了任務重試功能,重試次數限制功能、自定義重試政策以及重試攔截器能力。分别通過
retryable-execption-classes,retry-limit,retry-policy,cache-capacity,retry-listeners
實作。
- retryable-exception-classes:定義可以重試的異常,可以定義一組重試的異常,如果發生了定義的異常或者子類異常都會導緻重試。
- retry-limit:任務執行重試的最大次數。
- retry-polocy:定義自定義重試政策,需要實作接口RetryPolicy
- cache-capacity:retry-policy緩存的大小,緩存用于存放重試上下文RetryContext,如果超過配置最大值,會發生異常。
- retry-listeners:配置重試監聽器,監聽器需要實作接口RetryListener.
我們建立 一個step,驗證重試的功能:
@EnableBatchProcessing
@Configuration
public class RetryJobConf {
private AtomicInteger atomicInteger = new AtomicInteger();
@Bean
public String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(job, new JobParametersBuilder().addLong("date", 10L).toJobParameters());
return "";
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("chunk-skip-jobx")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory, RetryLis retryLis) {
return stepBuilderFactory.get("chunk-skip-step")
.<Long, Long>chunk(3)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retry(LessZoreException.class)
.retryLimit(3)
.retryPolicy(new SimpleRetryPolicy(3, Map.of(LessZoreException.class, true)))
.allowStartIfComplete(true)
.build();
}
private ItemProcessor<Long, Long> processor() {
return new ItemProcessor<Long, Long>() {
@Override
public Long process(Long item) throws Exception {
System.out.println(" process <" + item + "> " + atomicInteger.incrementAndGet());
throw new LessZoreException(item + " = 0 ");
}
};
}
private ItemWriter<Long> writer() {
return new ItemWriter<Long>() {
@Override
public void write(List<? extends Long> items) throws Exception {
System.out.println("writer : " + items);
}
};
}
private ItemReader<Long> reader() {
int num = 10;
List<Long> longs = new ArrayList<>(num);
while (--num > 0) {
longs.add((long) num);
}
ListItemReader<Long> longListItemReader = new ListItemReader<>(longs);
System.out.println("reader : " + longs);
return longListItemReader;
}
}
我們首先建立了一個讀取器,用于讀取[1,9]數字,讀取的數字将會列印出來。
接着建立一個處理器,處理器中每一個資料都将處理失敗,抛出異常,同時,建立一個線程安全的計數器,用于标記處理器進入的次數。
寫入器我們列印寫入器中接收到的資料。其實寫入器是拿不到任何資料的,因為全部的資料都在處理器中失敗了。
在step配置的時候,指定chunk模式,然後設定好讀取、處理、寫入器後,開啟進階功能,設定重試的異常以及重試的次數。
重試的政策和重試的異常和重試的次數,2選一即可。其實我們從重試政策中就能知道,從政策包含了上述的方法。
最後開啟step成功重複執行的開關即可。
執行:
如果重試也失敗了,但是我還是不想讓作業失敗,這個該怎麼處理呢?
跳過,前面我們就講過異常跳過。
我們無法保證重試一定成功,如果重試,最後都失敗了 ,那麼,還是相同的問題,不能因為1個資料,就讓給整個作業失敗。
對于一個step,我們既可以配置重試政策,也可以配置跳過政策。
并且重試優先于跳過。
舉個例子,我們配置stepA重試3次,跳過5個。
當第一個資料來了之後,進行處理和寫入,如果期間發生了異常,那麼就會先進行3次重試,如果3次重試都結束了,在執行一次跳過。
當第二個資料來了之後,進行處理和寫入,也在期間發生了異常,那麼就會先進行3次重試,如果3次重試都結束了,在執行一次跳過。
就這樣,直到資料全部處理完成,或者達到最大跳過數量。
換句話說,如果我們資料足夠多,那麼配置3次重試,5次跳過。總共會執行18次process,對應6個資料,前5個不會終止,後面的3次重試失敗,無法跳過,作業終止。
比如:
執行結果
Chunk完成政策
面向chunk的操作執行期間,根據設定的送出間隔commit-interval值,當讀資料達到送出間隔後,執行一次送出操作,然後重複執行chunk的讀操作,知道再次達到間隔值。spring batch架構除了提供commit-interval能力外,該架構還提供了chunk完成政策能力,通過完成政策可以配置任務的送出時機,chunk完成政策的定義接口為CompletionPolicy。
說明:chunk-completion-policy定義批處理完成政策,不是表示任務的完成政策,chunk執行期間是按照chunk完成政策執行批量送出的,批量送出會執行一次寫操作,同時将批處理的狀态資料通過JobRepository持久化。
說明:屬性chunk-completion-policy和屬性commit-interval不能同時存在;在chunk中至少定義這兩個其中的一個。
完成政策的接口定義
系統提供的接口的實作
我們建立自己的完成政策,内部類的方式
class MyPolicy extends CompletionPolicySupport {
@Override
public boolean isComplete(RepeatContext context) {
if (context.getStartedCount() % 3 == 0) {
return true;
} else {
return false;
}
}
}
我們定義,如果有3個資料處理完成,那麼就寫入。
當然這種配置方式還有更加簡單的,如果是根據完成數量進行寫入,可以直接設定commit-interval進行設定。
執行結果和設定chunk-completion-policy相同
如果我們同時配置了commit-interval和chunk-completion-policy,spring batch就會抛出異常。
會抛出這樣的異常
有了commit-interval為什麼還要有chunk-completion-policy呢?
因為在chunk-completion-poliocy中我們可以查詢資料庫,可以定義自己的的送出時機。chunk-completion-policy比commit-interval更加的靈活。
讀、處理事務
讀事務隊列
reader-transactional-queue:是否從一個事務性的隊列讀取資料,當reader從JMS的消息隊列擷取資料的時候,這個屬性才生效。說白了,這個屬性主要處理的是當出現異常的時候,是否需要将消費的消息重新投遞,以及消息是否可以重複消費的問題。
true表示從一個事務性的隊列中讀取資料,一旦發生異常會導緻事務復原,從隊列中讀取的資料同樣會被 重新放回到隊列中;false表示從一個沒有事務的隊列擷取資料,一旦發生異常導緻事務復原,消費掉的資料不會重新放回隊列。
比如
預設是有事務的,就是說,如果失敗,會重新放回 讀取隊列,重新處理。
比如:
@EnableBatchProcessing
@Configuration
public class TransJobConf {
private Integer integer = 0;
@Bean
public String runJob(JobLauncher jobLauncher,Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(job, new JobParametersBuilder().addLong("id", 25L).toJobParameters());
return "";
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,Step step) {
return jobBuilderFactory.get("trans-job-job")
.start(step)
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("tran-step-step")
.<Integer,Integer>chunk(3)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retry(LessZoreException.class)
.retryLimit(10)
.build();
}
private ItemWriter<Integer> writer() {
return new ItemWriter<Integer>() {
@Override
public void write(List<? extends Integer> items) throws Exception {
}
};
}
private ItemProcessor processor() {
return new ItemProcessor<Integer, Integer>() {
@Override
public Integer process(Integer item) throws Exception {
System.out.println("item = <" + item + "> ### ");
if (item == 3 ) {
throw new LessZoreException(" it's time !");
}
return item;
}
};
}
private ItemReader<Integer> reader() {
return new ItemReader<Integer>() {
@Override
public Integer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (integer > 10) {
return null;
}
return integer++;
}
};
}
}
這個是讀取器讀取0~10,進行中,當讀取到3的時候,異常。
如果是重新放回隊列,那麼會耗盡重試次數,重試次數總共是10次,并且最終失敗。因為3永遠都過不去。
如果使用了
readerIsTransactionalQueue
方法,則表明讀取器沒有事務,當發生了異常,不會重新處理,也就是不會復原。
這樣就能執行成功,因為讀取到3的時候,出現異常,進行重試,但是重試的時候,處理的是4,而不是3,這個有問題的就過去了,整個作業成功,而不是失敗。
執行結果如下:
為什麼是這樣的?按照我們的猜測,不是4和5都應該列印出來嗎?
整個執行過程是這樣的:首先,我們定義了chunk的處理數量是3,是以,每次讀取器會一口氣讀取3個資料(第一輪是0,1,2;第二輪是3,4,5;6,7,8;9,10;)
接着處理器一口氣處理3個資料,第一輪是OK的,第二輪第一個數字是3,失敗,因為有重試次數,而且讀取器不會復原,是以就讀取第三輪了6開始繼續處理。
預設讀取器是有事務的,失敗會復原,處理流程和上面一樣,處理3的時候異常,讀取器復原,下一次重試繼續處理3.
處理事務
process-transactional:處理資料是否在事務中,true表示再一次chunk處理期間将process處理的結果放在緩存中,當執行重試或者跳過政策時,可以看到緩存中處理的資料,在寫操作完成前可以重新執行processor;false表示在一次chunk處理期間不會将processor處理的資料放在緩存中,即processor在chunk執行期間每一條記錄僅會執行一次。
事務性processor處理過程
非事務性processor處理過程
但是經過仔細嘗試,發現這個處理事務貌似不生效。
攔截器
chunk操作中提供了豐富的攔截器機制,通過攔截器可以實作額外的控制能力,比如日志記錄,任務跟蹤,狀态報告和資料傳遞等。
這些攔截器的作用域
攔截器的執行順序
- JobExecutionListener.beforeJob()
- StepExecutionListener.beforeStep()
- ChunkListener.beforeChunk()
- ItemReaderListener.beforeRead()
- ItemReaderListener.afterRead()
- ItemProcessListener.beforeProcess()
- ItemProcessListener.afterProcess()
- ItemWriteListener.beforeWrite()
- ItemWriteListener.afterWwrite()
- ChunkListener.afterChunk()
- StepExecutionListener.afterStep()
- JobExecutionListener.afterJob()
ChunkListener
接口定義:
注解
spring batch實作
比如
public class ChunkLis implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("1. chunk lis before");
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("9. chunk lis after");
}
@Override
public void afterChunkError(ChunkContext context) {
System.out.println("0. chunk lis error");
}
}
配置
ItemReadListener
接口定義
注解
定義
public class ItemReadLis implements ItemReadListener {
@Override
public void beforeRead() {
System.out.println("4. item reader before");
}
@Override
public void afterRead(Object item) {
System.out.println("5. item reader after");
}
@Override
public void onReadError(Exception ex) {
System.out.println("00. item error");
}
}
配置
ItemProcessListener
接口定義
注解
定義
public class ItemProcessLis implements ItemProcessListener {
@Override
public void beforeProcess(Object item) {
System.out.println("6. item process before");
}
@Override
public void afterProcess(Object item, Object result) {
System.out.println("7. item process after");
}
@Override
public void onProcessError(Object item, Exception e) {
System.out.println("000. item process error");
}
}
配置
ItemWriteListener
接口定義
注解
定義
public class ItemWriteLis implements ItemWriteListener {
@Override
public void beforeWrite(List items) {
System.out.println("8. item write before");
}
@Override
public void afterWrite(List items) {
System.out.println("9. item write after");
}
@Override
public void onWriteError(Exception exception, List items) {
System.out.println("00000. item write error");
}
}
配置
SkipListener
接口定義
注解
spring batch預設實作
定義
public class SkipLis implements SkipListener {
@Override
public void onSkipInRead(Throwable t) {
System.out.println(" skip read ");
}
@Override
public void onSkipInWrite(Object item, Throwable t) {
System.out.println(" skip write ");
}
@Override
public void onSkipInProcess(Object item, Throwable t) {
System.out.println(" skip process ");
}
}
配置
RetryListener
接口定義
方法說明
spring batch預設實作
定義
public class RetryLis implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println(" retry open ");
return false;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println(" retry close ");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println(" retry error ");
}
}
配置
配置任務
執行結果
為什麼chunk的攔截器沒有執行?
調試發現,隻有處理器和寫入器設定成功了
這裡存疑,不知道為什麼。
經過多次嘗試,發現與設定攔截器的順序有關:
執行結果
這裡還有一個坑:
攔截器必須傳回true才會真的重試