天天看點

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

前言

概念詞就不多說了,我簡單地介紹下 , spring batch 是一個 友善使用的 較健全的 批處理 架構。

為什麼說是友善使用的,因為這是 基于spring的一個架構,接入簡單、易了解、流程分明。

為什麼說是較健全的, 因為它提供了往常我們在對大批量資料進行處理時需要考慮到的 日志跟蹤、事務粒度調配、可控執行、失敗機制、重試機制、資料讀寫等。

正文

那麼回到文章,我們該篇文章将會帶來給大家的是什麼?(結合執行個體講解那是當然的)

從實作的業務場景來說,有以下兩個:

1. 從  csv檔案 讀取資料,進行業務處理再存儲

2. 從 資料庫 讀取資料,進行業務處理再存儲

也就是平時經常遇到的資料清理或者資料過濾,又或者是資料遷移備份等等。 大批量的資料,自己實作分批處理需要考慮的東西太多了,又不放心,那麼使用 Spring Batch 架構 是一個很好的選擇。

首先,在進入執行個體教程前,我們看看這次的執行個體裡,我們使用springboot 整合spring batch 架構,要編碼的東西有什麼?

通過一張簡單的圖來了解:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

可能大家看到這個圖,是不是多多少少想起來定時任務架構? 确實有那麼點像,但是我必須在這告訴大家,這是一個批處理架構,不是一個schuedling 架構。 但是前面提到它提供了可執行控制,也就是說,啥時候執行是可控的,那麼顯然就是自己可以進行擴充結合定時任務架構,實作你心中所想。

ok,回到主題,相信大家能從圖中簡單明了地看到我們這次執行個體,需要實作的東西有什麼了。是以我就不在對各個小元件進行大批量文字的描述了。

那麼我們事不宜遲,開始我們的執行個體教程。

首先準備一個資料庫,裡面建一張簡單的表,用于執行個體資料的寫入存儲或者說是讀取等等。

bloginfo表

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

相關建表sql語句:

CREATE TABLE `bloginfo`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '部落格作者辨別',
  `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '部落格連結',
  `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '部落格标題',
  `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '部落格欄目',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;      

pom檔案裡的核心依賴:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--  spring batch -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>


        <!-- hibernate validator -->
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>6.0.7.Final</version>
        </dependency>
        <!--  mybatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
        <!--  mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>


        <!-- druid資料源驅動 1.1.10解決springboot從1.0——2.0版本問題-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.18</version>
        </dependency>      

yml檔案:

spring:
  batch:
    job:
#設定為 false -需要jobLaucher.run執行
      enabled: false
    initialize-schema: always
#    table-prefix: my-batch

  datasource:
    druid:
      username: root
      password: root
      url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
      driver-class-name: com.mysql.cj.jdbc.Driver
      initialSize: 5
      minIdle: 5
      maxActive: 20
      maxWait: 60000
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxPoolPreparedStatementPerConnectionSize: 20
      useGlobalDataSourceStat: true
      connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
server:
  port: 8665      
Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

ps:這裡我們用到了druid資料庫連接配接池,其實有個小坑,後面文章會講到。

因為我們這次的執行個體最終資料處理完之後,是寫入資料庫存儲(當然你也可以輸出到檔案等等)。

是以我們前面也建了一張表,pom檔案裡面我們也整合的mybatis,那麼我們在整合spring batch 主要編碼前,我們先把這些關于資料庫打通用到的簡單過一下。

pojo 層

BlogInfo.java :

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/
public class BlogInfo {

    private Integer id;
    private String blogAuthor;
    private String blogUrl;
    private String blogTitle;
    private String blogItem;

    @Override
    public String toString() {
        return "BlogInfo{" +
                "id=" + id +
                ", blogAuthor='" + blogAuthor + '\'' +
                ", blogUrl='" + blogUrl + '\'' +
                ", blogTitle='" + blogTitle + '\'' +
                ", blogItem='" + blogItem + '\'' +
                '}';
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getBlogAuthor() {
        return blogAuthor;
    }

    public void setBlogAuthor(String blogAuthor) {
        this.blogAuthor = blogAuthor;
    }

    public String getBlogUrl() {
        return blogUrl;
    }

    public void setBlogUrl(String blogUrl) {
        this.blogUrl = blogUrl;
    }

    public String getBlogTitle() {
        return blogTitle;
    }

    public void setBlogTitle(String blogTitle) {
        this.blogTitle = blogTitle;
    }

    public String getBlogItem() {
        return blogItem;
    }

    public void setBlogItem(String blogItem) {
        this.blogItem = blogItem;
    }
}      

mapper層

BlogMapper.java :

ps: 可以看到這個執行個體我用的是注解的方式,哈哈為了省事,而且我還不寫servcie層和impl層,也是為了省事,因為該篇文章重點不在這些,是以這些不好的大家不要學。

import com.example.batchdemo.pojo.BlogInfo;
import org.apache.ibatis.annotations.*;
import java.util.List;
import java.util.Map;

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/
@Mapper
public interface BlogMapper {
    @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem )   VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) ")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(BlogInfo bloginfo);


    @Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}")
     List<BlogInfo> queryInfoById(Map<String , Integer> map);

}      

接下來 ,重頭戲,我們開始對前邊那張圖裡涉及到的各個小元件進行編碼。

首先建立一個 配置類, MyBatchConfig.java:

從我起名來看,可以知道這基本就是咱們整合spring batch 涉及到的一些配置元件都會寫在這裡了。

首先我們按照咱們上面的圖來看,

裡面包含内容有:

1)JobRepository    job的注冊/存儲器

 2)JobLauncher      job的執行器 

3)Job                        job任務,包含一個或多個Step

 4)Step                    包含(ItemReader、ItemProcessor和ItemWriter) 

 5)ItemReader         資料讀取器 

 6)ItemProcessor     資料處理器

7)ItemWriter              資料輸出器

首先,在MyBatchConfig類前加入注解:

@Configuration  用于告訴spring,咱們這個類是一個自定義配置類,裡面很多bean都需要加載到spring容器裡面

@EnableBatchProcessing 開啟批處理支援

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

然後開始往MyBatchConfig類裡,編寫各個小元件。

JobRepository

寫在MyBatchConfig類裡

/**
     * JobRepository定義:Job的注冊容器以及和資料庫打交道(事務管理等)
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDataSource(dataSource);
        return jobRepositoryFactoryBean.getObject();
    }      

JobLauncher

寫在MyBatchConfig類裡 

/**
     * jobLauncher定義: job的啟動器,綁定相關的jobRepository
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        // 設定jobRepository
        jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager));
        return jobLauncher;
    }      

Job

寫在MyBatchConfig類裡 

/**
     * 定義job
     * @param jobs
     * @param myStep
     * @return
     */
    @Bean
    public Job myJob(JobBuilderFactory jobs, Step myStep){
        return jobs.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(myStep)
                .end()
                .listener(myJobListener())
                .build();
    }      

對于Job的運作,是可以配置監聽器的

JobListener

寫在MyBatchConfig類裡 

/**
     * 注冊job監聽器
     * @return
     */
    @Bean
    public MyJobListener myJobListener(){
        return new MyJobListener();
    }      

這是一個我們自己自定義的監聽器,是以是單獨建立的,MyJobListener.java:

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :監聽Job執行情況,實作JobExecutorListener,且在batch配置類裡,Job的Bean上綁定該監聽器
 **/

public class MyJobListener implements JobExecutionListener {

    private Logger logger = LoggerFactory.getLogger(MyJobListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        logger.info("job 開始, id={}",jobExecution.getJobId());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        logger.info("job 結束, id={}",jobExecution.getJobId());
    }
}      

Step(ItemReader   ItemProcessor   ItemWriter)

step裡面包含資料讀取器,資料處理器,資料輸出器三個小元件的的實作。

我們也是一個個拆解來進行編寫。

文章前邊說到,該篇實作的場景包含兩種,一種是從csv檔案讀入大量資料進行處理,另一種是從資料庫表讀入大量資料進行處理。

從CSV檔案讀取資料

ItemReader   

寫在MyBatchConfig類裡

/**
     * ItemReader定義:讀取檔案資料+entirty實體類映射
     * @return
     */
    @Bean
    public ItemReader<BlogInfo> reader(){
        // 使用FlatFileItemReader去讀cvs檔案,一行即一條資料
        FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
        // 設定檔案處在路徑
        reader.setResource(new ClassPathResource("static/bloginfo.csv"));
        // entity與csv資料做映射
        reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(new String[]{"blogAuthor","blogUrl","blogTitle","blogItem"});
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {
                    {
                        setTargetType(BlogInfo.class);
                    }
                });
            }
        });
        return reader;
    }      

簡單代碼解析:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

對于資料讀取器 ItemReader ,我們給它安排了一個讀取監聽器,建立 MyReadListener.java :

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/

public class MyReadListener implements ItemReadListener<BlogInfo> {

    private Logger logger = LoggerFactory.getLogger(MyReadListener.class);



    @Override
    public void beforeRead() {
    }

    @Override
    public void afterRead(BlogInfo item) {
    }

    @Override
    public void onReadError(Exception ex) {
        try {
            logger.info(format("%s%n", ex.getMessage()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}      

ItemProcessor

寫在MyBatchConfig類裡

/**
     * 注冊ItemProcessor: 處理資料+校驗資料
     * @return
     */
    @Bean
    public ItemProcessor<BlogInfo, BlogInfo> processor(){
        MyItemProcessor myItemProcessor = new MyItemProcessor();
        // 設定校驗器
        myItemProcessor.setValidator(myBeanValidator());
        return myItemProcessor;
    }      

資料處理器,是我們自定義的,裡面主要是包含我們對資料處理的業務邏輯,并且我們設定了一些資料校驗器,我們這裡使用

JSR-303的Validator來作為校驗器。

校驗器

寫在MyBatchConfig類裡

/**
     * 注冊校驗器
     * @return
     */
    @Bean
    public MyBeanValidator myBeanValidator(){
        return new MyBeanValidator<BlogInfo>();
    }      

建立MyItemProcessor.java :

ps: 裡面我的資料處理邏輯是,擷取出讀取資料裡面的每條資料的blogItem字段,如果是springboot,那就對title字段值進行替換。

其實也就是模拟一個簡單地資料處理場景。

import com.example.batchdemo.pojo.BlogInfo;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/
public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        /**
         * 需要執行super.process(item)才會調用自定義校驗器
         */
        super.process(item);
        /**
         * 對資料進行簡單的處理
         */
        if (item.getBlogItem().equals("springboot")) {
            item.setBlogTitle("springboot 系列還請看看我Jc");
        } else {
            item.setBlogTitle("未知系列");
        }
        return item;
    }
}      

建立MyBeanValidator.java:

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/2017
 * @Description :
 **/
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {

    private javax.validation.Validator validator;

    @Override
    public void validate(T value) throws ValidationException {
        /**
         * 使用Validator的validate方法校驗資料
         */
        Set<ConstraintViolation<T>> constraintViolations =
                validator.validate(value);
        if (constraintViolations.size() > 0) {
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }

    /**
     * 使用JSR-303的Validator來校驗我們的資料,在此進行JSR-303的Validator的初始化
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory =
                Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }

}      

ps:其實該篇文章沒有使用這個資料校驗器,大家想使用的話,可以在實體類上添加一些校驗器的注解@NotNull @Max @Email等等。我偏向于直接在處理器裡面進行處理,想把關于資料處理的代碼都寫在一塊。

ItemWriter

寫在MyBatchConfig類裡

/**
     * ItemWriter定義:指定datasource,設定批量插入sql語句,寫入資料庫
     * @param dataSource
     * @return
     */
    @Bean
    public ItemWriter<BlogInfo> writer(DataSource dataSource){
        // 使用jdbcBcatchItemWrite寫資料到資料庫中
        JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
        // 設定有參數的sql語句
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
        String sql = "insert into bloginfo "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
                +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }      

簡單代碼解析:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

同樣 對于資料讀取器 ItemWriter ,我們給它也安排了一個輸出監聽器,建立 MyWriteListener.java:

import com.example.batchdemo.pojo.BlogInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
import static java.lang.String.format;

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/
public class MyWriteListener implements ItemWriteListener<BlogInfo> {
    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);

    @Override
    public void beforeWrite(List<? extends BlogInfo> items) {
    }

    @Override
    public void afterWrite(List<? extends BlogInfo> items) {
    }

    @Override
    public void onWriteError(Exception exception, List<? extends BlogInfo> items) {
        try {
            logger.info(format("%s%n", exception.getMessage()));
            for (BlogInfo message : items) {
                logger.info(format("Failed writing BlogInfo : %s", message.toString()));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}      

ItemReader   ItemProcessor   ItemWriter

這三個小元件到這裡,我們都實作了,那麼接下來就是把這三個小元件跟我們的step去綁定起來。

寫在MyBatchConfig類裡

/**
     * step定義:
     * 包括
     * ItemReader 讀取
     * ItemProcessor  處理
     * ItemWriter 輸出
     * @param stepBuilderFactory
     * @param reader
     * @param writer
     * @param processor
     * @return
     */

    @Bean
    public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader,
                     ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){
        return stepBuilderFactory
                .get("myStep")
                .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機制(即每次讀取一條資料,再處理一條資料,累積到一定數量後再一次性交給writer進行寫入操作)
                .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
                .listener(new MyReadListener())
                .processor(processor)
                .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
                .listener(new MyWriteListener())
                .build();
    }      

這個Step,稍作講解。

前邊提到了,spring batch架構,提供了事務的控制,重新開機,檢測跳過等等機制。

那麼,這些東西的實作,很多都在于這個step環節的設定。

首先看到我們代碼出現的第一個設定,

chunk( 6500 ) 

Chunk的機制(即每次讀取一條資料,再處理一條資料,累積到一定數量後再一次性交給writer進行寫入操作。

沒錯,對于整個step環節,就是資料的讀取,處理最後到輸出。

這個chunk機制裡,我們傳入的 6500,也就是是告訴它,讀取處理資料,累計達到 6500條進行一次批次處理,去執行寫入操作。

這個傳值,是根據具體業務而定,可以是500條一次,1000條一次,也可以是20條一次,50條一次。

通過一張簡單的小圖來幫助了解:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

在我們大量資料處理,不管是讀取或者說是寫入,都肯定會涉及到一些未知或者已知因素導緻某條資料失敗了。

那麼如果說咱們啥也不設定,失敗一條資料,那麼我們就當作整個失敗了?。顯然這個太不人性,是以spring batch 提供了 retry 和 skip 兩個設定(其實還有restart) ,通過這兩個設定來人性化地解決一些資料操作失敗場景。

retryLimit(3).retry(Exception.class)  

沒錯,這個就是設定重試,當出現異常的時候,重試多少次。我們設定為3,也就是說當一條資料操作失敗,那我們會對這條資料進行重試3次,還是失敗就是 當做失敗了, 那麼我們如果有配置skip(推薦配置使用),那麼這個資料失敗記錄就會留到給 skip 來處理。

skip(Exception.class).skipLimit(2)  

skip,跳過,也就是說我們如果設定3, 那麼就是可以容忍 3條資料的失敗。隻有達到失敗資料達到3次,我們才中斷這個step。

對于失敗的資料,我們做了相關的監聽器以及異常資訊記錄,供與後續手動補救。

那麼記下來我們開始去調用這個批處理job,我們通過接口去觸發這個批處理事件,建立一個Controller,TestController.java:

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/
@RestController
public class TestController {
    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job myJob;

    @GetMapping("testJob")
    public  void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
     //    後置參數:使用JobParameters中綁定參數 addLong  addString 等方法
        JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
        jobLauncher.run(myJob, jobParameters);

    }



}      

對了,我準備了一個csv檔案 bloginfo.csv,裡面大概8萬多條資料,用來進行批處理測試: 

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

這個檔案的路徑跟我們的資料讀取器裡面讀取的路徑要一直,

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解
Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

目前我們資料庫是這個樣子,

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

接下來我們把我們的項目啟動起來,

再看一眼資料庫,生成了一些batch用來跟蹤記錄job的一些資料表:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

我們來調用一下testJob接口,

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

然後看下資料庫,可以看的資料全部都進行了相關的邏輯處理并插入到了資料庫:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

到這裡,我們對Springboot 整合 spring batch 其實已經操作完畢了,也實作了從csv檔案讀取資料處理存儲的業務場景。

從資料庫讀取資料

ps:前排提示使用druid有坑。後面會講到。

那麼接下來實作場景,從資料庫表内讀取資料進行處理輸出到新的表裡面。

那麼基于我們上邊的整合,我們已經實作了

1)JobRepository    job的注冊/存儲器

 2)JobLauncher      job的執行器 

3)Job                        job任務,包含一個或多個Step

 4)Step                    包含(ItemReader、ItemProcessor和ItemWriter) 

 5)ItemReader         資料讀取器 

 6)ItemProcessor     資料處理器

7)ItemWriter              資料輸出器

外加

8) job監聽器

9)reader監聽器

10)writer監聽器

11)process資料校驗器

那麼對于我們新寫一個job完成 一個新的場景,我們需要全部重寫麼?

顯然沒必要,當然完全新寫一套也是可以的。

那麼該篇,對于一個新的也出場景,從csv檔案讀取資料轉換到資料庫表讀取資料,我們重新建立的有:

1. 資料讀取器  原先使用的是 FlatFileItemReader ,我們現在改為使用 MyBatisCursorItemReader 。

2.資料處理器  新的場景,業務為了好擴充,是以我們處理器最好也建立一個

3.資料輸出器    新的場景,業務為了好擴充,是以我們資料輸出器最好也建立一個

4.step的綁定設定, 新的場景,業務為了好擴充,是以我們step最好也建立一個

5.Job  當然是要重新寫一個了

其他我們照用原先的就行, JobRepository    ,JobLauncher    , 以及各種監聽器啥的,暫且不重建立了。

建立MyItemProcessorNew.java:

import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;

/**
 * @Author : JCccc
 * @CreateTime : 2020/3/17
 * @Description :
 **/
public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        /**
         * 需要執行super.process(item)才會調用自定義校驗器
         */
        super.process(item);
        /**
         * 對資料進行簡單的處理
         */
        Integer authorId= Integer.valueOf(item.getBlogAuthor());
        if (authorId<20000) {
            item.setBlogTitle("這是都是小于20000的資料");
        } else if (authorId>20000 && authorId<30000){
            item.setBlogTitle("這是都是小于30000但是大于20000的資料");
        }else {
            item.setBlogTitle("舊書不厭百回讀");
        }
        return item;
    }
}      

 然後其他重新定義的小元件,寫在MyBatchConfig類裡:

/**
     * 定義job
     * @param jobs
     * @param stepNew
     * @return
     */
    @Bean
    public Job myJobNew(JobBuilderFactory jobs, Step stepNew){
        return jobs.get("myJobNew")
                .incrementer(new RunIdIncrementer())
                .flow(stepNew)
                .end()
                .listener(myJobListener())
                .build();

    }


    @Bean
    public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew,
                        ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){
        return stepBuilderFactory
                .get("stepNew")
                .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機制(即每次讀取一條資料,再處理一條資料,累積到一定數量後再一次性交給writer進行寫入操作)
                .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
                .listener(new MyReadListener())
                .processor(processorNew)
                .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
                .listener(new MyWriteListener())
                .build();

    }
    
    @Bean
    public ItemProcessor<BlogInfo, BlogInfo> processorNew(){
        MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
        // 設定校驗器
        csvItemProcessor.setValidator(myBeanValidator());
        return csvItemProcessor;
    }


    
    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    @Bean
    @StepScope
    //Spring Batch提供了一個特殊的bean scope類(StepScope:作為一個自定義的Spring bean scope)。這個step scope的作用是連接配接batches的各個steps。這個機制允許配置在Spring的beans當steps開始時才執行個體化并且允許你為這個step指定配置和參數。
    public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {

            System.out.println("開始查詢資料庫");

            MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();

            reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");

            reader.setSqlSessionFactory(sqlSessionFactory);
             Map<String , Object> map = new HashMap<>();

              map.put("authorId" , Integer.valueOf(authorId));
             reader.setParameterValues(map);
            return reader;
    }

    /**
     * ItemWriter定義:指定datasource,設定批量插入sql語句,寫入資料庫
     * @param dataSource
     * @return
     */
    @Bean
    public ItemWriter<BlogInfo> writerNew(DataSource dataSource){
        // 使用jdbcBcatchItemWrite寫資料到資料庫中
        JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
        // 設定有參數的sql語句
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
        String sql = "insert into bloginfonew "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
                +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }      

代碼需要注意的點

資料讀取器 MyBatisCursorItemReader 

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

對應的mapper方法:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

資料處理器 MyItemProcessorNew:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

資料輸出器,新插入到别的資料庫表去,特意這樣為了測試:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

當然我們的資料庫為了測試這個場景,也是建立了一張表,bloginfonew 表。

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

接下來,我們新寫一個接口來執行新的這個job:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解
@Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job myJobNew;

    @GetMapping("testJobNew")
    public  void testJobNew(@RequestParam("authorId") String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

        JobParameters jobParametersNew = new JobParametersBuilder().addLong("timeNew", System.currentTimeMillis())
                .addString("authorId",authorId)
                .toJobParameters();
        jobLauncher.run(myJobNew,jobParametersNew);

    }      

ok,我們來調用一些這個接口:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

看下控制台:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

沒錯,這就是失敗的,原因是因為跟druid有關,報了一個資料庫功能不支援。 這是在資料讀取的時候報的錯。

我初步測試認為是MyBatisCursorItemReader ,druid 資料庫連接配接池不支援。

那麼,我們隻需要:

1.注釋掉druid連接配接池 jar依賴

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

2.yml裡替換連接配接池配置

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

其實我們不配置其他連接配接池,springboot 2.X 版本已經為我們整合了預設的連接配接池 ​​HikariCP​​ 。

在Springboot2.X版本,資料庫的連接配接池官方推薦使用​​HikariCP​​​

如果不是為了druid的那些背景監控資料,sql分析等等,完全是優先使用​​HikariCP​​的。

  官方的原話:

We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.      

翻譯:

我們更喜歡hikaricpf的性能和并發性。如果有HikariCP,我們總是選擇它。

是以我們就啥連接配接池也不配了,使用預設的​​HikariCP​​ 連接配接池。

當然你想配,也是可以的:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

是以我們剔除掉druid連結池後,我們再來調用一下新接口:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

可以看到,從資料庫擷取資料并進行批次處理寫入job是成功的:

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

新的表裡面插入的資料都進行了自己寫的邏輯處理: 

Springboot 整合 spring batch 實作批處理 ,小白文執行個體講解

好了,springboot 整合 spring batch 批處理架構, 就到此吧。