天天看點

SpringBatch學習

什麼是SpringBatch

Spring Batch 是一個輕量級的、完善的批處理架構(并不是排程架構,需要配合Quartz等架構,實作定時任務),旨在幫助企業建立健壯、高效的批處理應用。

Spring Batch 提供了大量可重用的元件,包括了日志、追蹤、事務、任務作業統計、任務重新開機、跳過、重複、資源管理。對于大資料量和高性能的批處理任務,Spring Batch 同樣提供了進階功能和特性來支援,比如分區功能、遠端功能。總之,通過 Spring Batch 能夠支援簡單的、複雜的和大資料量的批處理作業。

運作第一個SpringBatch

package com.lion.springbatch.configuration;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

@Configuration
@Order(2)
@EnableBatchProcessing
public class FirstConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;// 建立Job任務

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job getJob() {
        return jobBuilderFactory.get("firstJob").start(step()).build();
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("firstStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("step ~~");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

}
           

pom檔案

這裡我引入了h2嵌入式資料庫,當然我們也可以用MYSQL等。

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
			<scope>runtime</scope>
		</dependency>
           

運作截圖

SpringBatch學習

概念

SpringBatch學習

通過JobLauncher來啟動SpringBatch,每一個任務是一個Job,每個任務先執行什麼在執行什麼工作就是step幹的事情(tasklet和chunk),每一個Step對應一個ItemReader、ItemProcessor和ItemWriter。JobRepository是用來儲存任務運作過程中的相關資訊,任務出錯了,那肯定要記錄下來,不然怎麼重新執行呢?對吧。

SpringBatch涉及到的資料庫表

  1. job_instance表

    從字面意思來看是job運作的一個執行個體。運作n次相同的job隻産生1個JobInstance。

    JobInstance = Job + JobParameters

  2. job_execution表

    每一次job執行,它會生成一條記錄,記錄job運作開始及結束時間,并且有成功或失敗的記錄。

  3. job_execution_jobParameters表

    每個JobInstance可以帶有參數,JobInstance 如果有帶參數則隻能運作一次。(需要注意),那麼我們隻有一個參數是一個固定的檔案路徑,那麼可以使用JobParametersIncrementer接口,來擷取每一個jobParameter。

  4. job_execution_context表

    上下文表,可以讓job之間共享一些資料。

  5. step_execution表

    每次step觸發後就會産生一個stepExecution,step不像job,是沒有stepinstance的。

  6. step_execution_context表

    讓step之間共享一些資料。

相關概念

flow與step

什麼是step?

每一個Step對象都封裝了批處理作業的一個獨立的階段。 每一個Job本質上都是由一個或多個步驟組成。

什麼flow?

Job,Flow建立及應用

flow是一個Step的集合,他規定了Step與Step之間的轉換關系;

建立Flow可以達到複用的效果,讓其在不同的Job之間進行複用;

使用FlowBuilder去建立一個Flow,他和Job類似,使用start(),next()以及end()來運作flow;

chunk與tasklet

什麼是tasklet?

takslet意味着在step中執行單個任務,job有多個step按一定順序組成,每個步驟應該執行一個具體任務。

什麼是chunk?

基于資料塊(一部分資料)執行。也就是說,其不是一次讀、處理和寫所有行,而是一次僅讀、處理、寫固定數量記錄。然後重複循環執行直到讀不到資料為止。

itemReader與itemWriter差別

itemReader是一個資料一個資料的讀。

itemWriter是一批一批的輸出。是在chunk方法中指定的數量。

多線程執行Job任務

(1)使用split方法,傳入一個線程池。SplitBuilder隻有一個add方法,加入flow對象。

舉例說明:

(1)首先定義兩個Flow,在每個Flow中定義一些Step,每一個Step将自身的名字以及目前運作的線程列印出來;

(2)建立一個Job使用Spilt異步的啟動兩個Flow;

(3)運作Job,檢視結果(job裡的兩個flow,分别在不同線程中執行);

@Bean
    public Job splitDemoJob() {
        return jobBuilderFactory.get("splitDemoJob").start(splitDempFlow1())
                .split(new SimpleAsyncTaskExecutor())
                .add(splitDempFlow2()).end().build();
    }

    @Bean
    public Step splitDemoStep1() {
        return stepBuilderFactory.get("splitDemoStep1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("splitDemoStep1 ~~");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }
    @Bean
    public Step splitDemoStep2() {
        return stepBuilderFactory.get("splitDemoStep2").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("splitDemoStep2 ~~");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }
    @Bean
    public Step splitDemoStep3() {
        return stepBuilderFactory.get("splitDemoStep3").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("splitDemoStep3 ~~");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Flow splitDempFlow1() {
       return new FlowBuilder<Flow>("splitDempFlow1").start(splitDemoStep1()).build();
    }

    @Bean
    public Flow splitDempFlow2() {
        return new FlowBuilder<Flow>("splitDempFlow2").start(splitDemoStep2())
                .next(splitDemoStep3()).build();
    }
           

監聽器

1.Listener:控制Job執行的一種方式

2.可以通過接口或者注解實作監聽器

3.在spring-batch中提供各個級别的監聽器接口,從job級别到item級别都有

(1)JobExecutionListener(before…,after…);

(2)StepExecutionListener(before…,after…);

(3)ChunkListener(before…,after…);

(4)ItemReaderListener;ItemWriterListener;ItemProcessListener(before…,after…,error…);

注解方式

SpringBatch學習

注解方式

public class MyChunkListener {
    @BeforeChunk
    public void beforeChunk(ChunkContext context) {
        System.out.println(context.getStepContext().getStepName()+"chunk before running.....");
    }
    @AfterChunk
    public void afterChunk(ChunkContext context) {
        System.out.println(context.getStepContext().getStepName()+"chunk after running.....");
    }
}
           

接口方式

SpringBatch學習
SpringBatch學習
SpringBatch學習
public class MyJobListener implements JobExecutionListener{
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
    }
    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
    }
}
           

具體使用:Job listener

ItemSteamReader

該接口可以用來處理任務異常,并重新執行。

SpringBatch學習

ItemSteamReader繼承了ItemStream接口和ItemReader接口。

ItemReader接口隻有一個read方法,而ItemStream有open、update、close方法。

SpringBatch學習

open方法是在step任務開始前執行的,我們可以再次之前從ExecutionContext中取出我們上一次執行的任務位置,繼續往下執行。

update方法是在每次完成一個Chunk任務後進行調用的方法,可以記錄我們完成了多少任務,目前進度在哪裡。

close方法是在任務結束後調用的。

ItemReader的異常處理與重新開機

如果在ItemReader調用read方法過程中出現異常,那麼在重新開機後ItemReader隻能重新從第一條資料開始讀取,這主要是因為我們并不知道第幾行的時候出現異常,是以我們可以實作update方法和open方法。open方法可以在開始是判斷ExecutionContext是否存在某個參數,如果不存在那麼從頭開始讀,如果存在從指定位置開始讀。而update方法需要每次任務處理完記錄目前完成的位置,儲存進ExecutionContext,如果chunk設定的太大,也不會讀取過多的處理過的資料。

ItemProcessor

SpringBatch學習

ItemProcessor可以用于處理業務邏輯、驗證、過濾等功能。沒啥說的。

錯誤處理

SpringBatch學習

可以通過execute方法的ChunkContext來達到儲存狀态。

//傳回的是一個map對象
chunkContext.getStepContext().getStepExecutionContext();
           

錯誤重試

當發生異常時我們可以進行retry重試,并設定重試的次數。這樣就不會立即停止任務了。

SpringBatch學習

錯誤跳過

當出現錯誤我們也可以進行跳過,如果可以的話。

skip函數指定跳過哪個異常。

SpringBatch學習

錯誤跳過監聽器

我們可以通過listener方法加入錯誤跳過監聽器。該監聽器需要自己實作SkipListener接口。

SpringBatch學習