什麼是SpringBatch
Spring Batch 是一個輕量級的、完善的批處理架構(并不是排程架構,需要配合Quartz等架構,實作定時任務),旨在幫助企業建立健壯、高效的批處理應用。
Spring Batch 提供了大量可重用的元件,包括了日志、追蹤、事務、任務作業統計、任務重新開機、跳過、重複、資源管理。對于大資料量和高性能的批處理任務,Spring Batch 同樣提供了進階功能和特性來支援,比如分區功能、遠端功能。總之,通過 Spring Batch 能夠支援簡單的、複雜的和大資料量的批處理作業。
運作第一個SpringBatch
package com.lion.springbatch.configuration;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@Configuration
@Order(2)
@EnableBatchProcessing
public class FirstConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;// 建立Job任務
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job getJob() {
return jobBuilderFactory.get("firstJob").start(step()).build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("firstStep").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step ~~");
return RepeatStatus.FINISHED;
}
}).build();
}
}
pom檔案
這裡我引入了h2嵌入式資料庫,當然我們也可以用MYSQL等。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
運作截圖
概念
通過JobLauncher來啟動SpringBatch,每一個任務是一個Job,每個任務先執行什麼在執行什麼工作就是step幹的事情(tasklet和chunk),每一個Step對應一個ItemReader、ItemProcessor和ItemWriter。JobRepository是用來儲存任務運作過程中的相關資訊,任務出錯了,那肯定要記錄下來,不然怎麼重新執行呢?對吧。
SpringBatch涉及到的資料庫表
-
job_instance表
從字面意思來看是job運作的一個執行個體。運作n次相同的job隻産生1個JobInstance。
JobInstance = Job + JobParameters
-
job_execution表
每一次job執行,它會生成一條記錄,記錄job運作開始及結束時間,并且有成功或失敗的記錄。
-
job_execution_jobParameters表
每個JobInstance可以帶有參數,JobInstance 如果有帶參數則隻能運作一次。(需要注意),那麼我們隻有一個參數是一個固定的檔案路徑,那麼可以使用JobParametersIncrementer接口,來擷取每一個jobParameter。
-
job_execution_context表
上下文表,可以讓job之間共享一些資料。
-
step_execution表
每次step觸發後就會産生一個stepExecution,step不像job,是沒有stepinstance的。
-
step_execution_context表
讓step之間共享一些資料。
相關概念
flow與step
什麼是step?
每一個Step對象都封裝了批處理作業的一個獨立的階段。 每一個Job本質上都是由一個或多個步驟組成。
什麼flow?
Job,Flow建立及應用
flow是一個Step的集合,他規定了Step與Step之間的轉換關系;
建立Flow可以達到複用的效果,讓其在不同的Job之間進行複用;
使用FlowBuilder去建立一個Flow,他和Job類似,使用start(),next()以及end()來運作flow;
chunk與tasklet
什麼是tasklet?
takslet意味着在step中執行單個任務,job有多個step按一定順序組成,每個步驟應該執行一個具體任務。
什麼是chunk?
基于資料塊(一部分資料)執行。也就是說,其不是一次讀、處理和寫所有行,而是一次僅讀、處理、寫固定數量記錄。然後重複循環執行直到讀不到資料為止。
itemReader與itemWriter差別
itemReader是一個資料一個資料的讀。
itemWriter是一批一批的輸出。是在chunk方法中指定的數量。
多線程執行Job任務
(1)使用split方法,傳入一個線程池。SplitBuilder隻有一個add方法,加入flow對象。
舉例說明:
(1)首先定義兩個Flow,在每個Flow中定義一些Step,每一個Step将自身的名字以及目前運作的線程列印出來;
(2)建立一個Job使用Spilt異步的啟動兩個Flow;
(3)運作Job,檢視結果(job裡的兩個flow,分别在不同線程中執行);
@Bean
public Job splitDemoJob() {
return jobBuilderFactory.get("splitDemoJob").start(splitDempFlow1())
.split(new SimpleAsyncTaskExecutor())
.add(splitDempFlow2()).end().build();
}
@Bean
public Step splitDemoStep1() {
return stepBuilderFactory.get("splitDemoStep1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("splitDemoStep1 ~~");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step splitDemoStep2() {
return stepBuilderFactory.get("splitDemoStep2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("splitDemoStep2 ~~");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step splitDemoStep3() {
return stepBuilderFactory.get("splitDemoStep3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("splitDemoStep3 ~~");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Flow splitDempFlow1() {
return new FlowBuilder<Flow>("splitDempFlow1").start(splitDemoStep1()).build();
}
@Bean
public Flow splitDempFlow2() {
return new FlowBuilder<Flow>("splitDempFlow2").start(splitDemoStep2())
.next(splitDemoStep3()).build();
}
監聽器
1.Listener:控制Job執行的一種方式
2.可以通過接口或者注解實作監聽器
3.在spring-batch中提供各個級别的監聽器接口,從job級别到item級别都有
(1)JobExecutionListener(before…,after…);
(2)StepExecutionListener(before…,after…);
(3)ChunkListener(before…,after…);
(4)ItemReaderListener;ItemWriterListener;ItemProcessListener(before…,after…,error…);
注解方式
注解方式
public class MyChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext context) {
System.out.println(context.getStepContext().getStepName()+"chunk before running.....");
}
@AfterChunk
public void afterChunk(ChunkContext context) {
System.out.println(context.getStepContext().getStepName()+"chunk after running.....");
}
}
接口方式
public class MyJobListener implements JobExecutionListener{
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
}
}
具體使用:Job listener
ItemSteamReader
該接口可以用來處理任務異常,并重新執行。
ItemSteamReader繼承了ItemStream接口和ItemReader接口。
ItemReader接口隻有一個read方法,而ItemStream有open、update、close方法。
open方法是在step任務開始前執行的,我們可以再次之前從ExecutionContext中取出我們上一次執行的任務位置,繼續往下執行。
update方法是在每次完成一個Chunk任務後進行調用的方法,可以記錄我們完成了多少任務,目前進度在哪裡。
close方法是在任務結束後調用的。
ItemReader的異常處理與重新開機
如果在ItemReader調用read方法過程中出現異常,那麼在重新開機後ItemReader隻能重新從第一條資料開始讀取,這主要是因為我們并不知道第幾行的時候出現異常,是以我們可以實作update方法和open方法。open方法可以在開始是判斷ExecutionContext是否存在某個參數,如果不存在那麼從頭開始讀,如果存在從指定位置開始讀。而update方法需要每次任務處理完記錄目前完成的位置,儲存進ExecutionContext,如果chunk設定的太大,也不會讀取過多的處理過的資料。
ItemProcessor
ItemProcessor可以用于處理業務邏輯、驗證、過濾等功能。沒啥說的。
錯誤處理
可以通過execute方法的ChunkContext來達到儲存狀态。
//傳回的是一個map對象
chunkContext.getStepContext().getStepExecutionContext();
錯誤重試
當發生異常時我們可以進行retry重試,并設定重試的次數。這樣就不會立即停止任務了。
錯誤跳過
當出現錯誤我們也可以進行跳過,如果可以的話。
skip函數指定跳過哪個異常。
錯誤跳過監聽器
我們可以通過listener方法加入錯誤跳過監聽器。該監聽器需要自己實作SkipListener接口。