天天看點

spring-batch批處理架構介紹及demo

最近在項目中,有批處理的相關需求。最終標明技術架構為spring-batch批處理架構,在此專欄中為大家分享spring-batch的基礎知識與項目遇到的一些實際問題的解決方案。

 1.架構介紹

spring-batch主要分為job和step。step中有包含reader、process、writer等主要功能。step還可設定任務并行、串行與多線程執行。整個架構包含了常用的大量監聽器監聽每一個細分步驟的執行。以及批處理過程中常用的重試機制。

spring-batch擁有很強大的檔案讀取器,平面檔案、資料庫、html等檔案都能進行讀取。并且支援自定義的讀取器。同時寫入器也支援平面檔案及資料庫等各種類型的資料寫入。

2.适用場景

spring-batch适用于資料同步、資料整合統計、資料遷移等場景

主要開發步驟為:資料擷取(讀)->資料處理->資料寫入(寫)

3.一個簡單的spring-batch demo。采用spring-boot+spring-batch+mysql。

依賴引入可以參考官網教程

下面主要展示yml配置和基本代碼

yml配置spring-batch及mysql資料庫

spring:
  batch:
    job:
    # 預設自動執行定義的Job(true),改為false,需要jobLaucher.run執行
    enabled: false
    # spring batch在資料庫裡面建立預設的資料表,如果不是always則會提示相關表不存在
    initialize-schema: always
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/job_batch?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=UTC
    username: root
    password: 123456
    hikari:
      connection-test-query: select 1
           

準備需要讀取的平面檔案

userId,userName
201907190921,周一
201907190922,趙二
201907190923,張三
201907190924,李四
201907190925,王五
201907190926,闫六
201907190927,劉七
201907190928,胡八
           

編寫reader

package com.flight.neon.batch.demo.job.reder;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

/**
 * @author 這個碼農不太萌
 */
@Component
public class DemoReader {

    @Bean("userReader")
    public FlatFileItemReader<User> userReader(){
        ClassPathResource classPathResource = new ClassPathResource("text/user.txt");
        FlatFileItemReader<User> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setResource(classPathResource);
        //設定跳過行數
        flatFileItemReader.setLinesToSkip(1);
        //資料轉換
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"userId","userName"});
        DefaultLineMapper<User> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(tokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSet -> {
            User user = new User();
            user.setUserId(fieldSet.readString("userId"));
            user.setUserName(fieldSet.readString("userName"));
            return user;
        });
        defaultLineMapper.afterPropertiesSet();
        flatFileItemReader.setLineMapper(defaultLineMapper);
        return flatFileItemReader;
    }

}
           

編寫process

package com.flight.neon.batch.demo.job.processor;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author 這個碼農不太萌
 */
@Component
public class DemoProcess {

    @Bean("orgInfoProcess")
    public ItemProcessor<User, User> userProcess() {
        ItemProcessor<User, User> itemProcessor = ehrOrg -> {
            //資料處理
            return ehrOrg;
        };
        return itemProcessor;
    }

}
           

編寫writer

package com.flight.neon.batch.demo.job.writer;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author 這個碼農不太萌
 */
@Configuration
public class DemoWriter {

    @Autowired
    DataSource dataSource;

    @Bean("userWriter")
    public JdbcBatchItemWriter<User> userWriter() {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        //設定資料源
        writer.setDataSource(dataSource);
        //設定sql
        writer.setSql("insert into user (user_id,user_name) values (:userId,:userName)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

}
           

編寫step及job

package com.flight.neon.batch.demo.job;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author 這個碼農不太萌
 */
@Component
public class DemoJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("userReader")
    FlatFileItemReader<User> userReader;

    @Autowired
    @Qualifier("orgInfoProcess")
    ItemProcessor<User, User> orgInfoProcess;

    @Autowired
    @Qualifier("userWriter")
    JdbcBatchItemWriter<User> userWriter;

    @Bean
    public Job userJob() {
        return jobBuilderFactory.get("userJob")
                .start(userStep())
                .build();
    }

    public Step userStep() {
        return stepBuilderFactory.get("userStep")
                .<User,User>chunk(100)
                .reader(userReader)
                .processor(orgInfoProcess)
                .writer(userWriter)
                .build();
    }

}
           

編寫單元測試

package com.flight.neon.batch.demo.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoJobTest {

    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job userJob;

    @Test
    public void demoTest() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(userJob, jobParameters);
    }

}
           

運作後,可在資料庫中看到錄入資料