最近在項目中,有批處理的相關需求。最終標明技術架構為spring-batch批處理架構,在此專欄中為大家分享spring-batch的基礎知識與項目遇到的一些實際問題的解決方案。
1.架構介紹
spring-batch主要分為job和step。step中有包含reader、process、writer等主要功能。step還可設定任務并行、串行與多線程執行。整個架構包含了常用的大量監聽器監聽每一個細分步驟的執行。以及批處理過程中常用的重試機制。
spring-batch擁有很強大的檔案讀取器,平面檔案、資料庫、html等檔案都能進行讀取。并且支援自定義的讀取器。同時寫入器也支援平面檔案及資料庫等各種類型的資料寫入。
2.适用場景
spring-batch适用于資料同步、資料整合統計、資料遷移等場景
主要開發步驟為:資料擷取(讀)->資料處理->資料寫入(寫)
3.一個簡單的spring-batch demo。采用spring-boot+spring-batch+mysql。
依賴引入可以參考官網教程
下面主要展示yml配置和基本代碼
yml配置spring-batch及mysql資料庫
spring:
batch:
job:
# 預設自動執行定義的Job(true),改為false,需要jobLaucher.run執行
enabled: false
# spring batch在資料庫裡面建立預設的資料表,如果不是always則會提示相關表不存在
initialize-schema: always
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/job_batch?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=UTC
username: root
password: 123456
hikari:
connection-test-query: select 1
準備需要讀取的平面檔案
userId,userName
201907190921,周一
201907190922,趙二
201907190923,張三
201907190924,李四
201907190925,王五
201907190926,闫六
201907190927,劉七
201907190928,胡八
編寫reader
package com.flight.neon.batch.demo.job.reder;
import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
/**
* @author 這個碼農不太萌
*/
@Component
public class DemoReader {
@Bean("userReader")
public FlatFileItemReader<User> userReader(){
ClassPathResource classPathResource = new ClassPathResource("text/user.txt");
FlatFileItemReader<User> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(classPathResource);
//設定跳過行數
flatFileItemReader.setLinesToSkip(1);
//資料轉換
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"userId","userName"});
DefaultLineMapper<User> defaultLineMapper = new DefaultLineMapper<>();
defaultLineMapper.setLineTokenizer(tokenizer);
defaultLineMapper.setFieldSetMapper(fieldSet -> {
User user = new User();
user.setUserId(fieldSet.readString("userId"));
user.setUserName(fieldSet.readString("userName"));
return user;
});
defaultLineMapper.afterPropertiesSet();
flatFileItemReader.setLineMapper(defaultLineMapper);
return flatFileItemReader;
}
}
編寫process
package com.flight.neon.batch.demo.job.processor;
import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author 這個碼農不太萌
*/
@Component
public class DemoProcess {
@Bean("orgInfoProcess")
public ItemProcessor<User, User> userProcess() {
ItemProcessor<User, User> itemProcessor = ehrOrg -> {
//資料處理
return ehrOrg;
};
return itemProcessor;
}
}
編寫writer
package com.flight.neon.batch.demo.job.writer;
import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/**
* @author 這個碼農不太萌
*/
@Configuration
public class DemoWriter {
@Autowired
DataSource dataSource;
@Bean("userWriter")
public JdbcBatchItemWriter<User> userWriter() {
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
//設定資料源
writer.setDataSource(dataSource);
//設定sql
writer.setSql("insert into user (user_id,user_name) values (:userId,:userName)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
}
編寫step及job
package com.flight.neon.batch.demo.job;
import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author 這個碼農不太萌
*/
@Component
public class DemoJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("userReader")
FlatFileItemReader<User> userReader;
@Autowired
@Qualifier("orgInfoProcess")
ItemProcessor<User, User> orgInfoProcess;
@Autowired
@Qualifier("userWriter")
JdbcBatchItemWriter<User> userWriter;
@Bean
public Job userJob() {
return jobBuilderFactory.get("userJob")
.start(userStep())
.build();
}
public Step userStep() {
return stepBuilderFactory.get("userStep")
.<User,User>chunk(100)
.reader(userReader)
.processor(orgInfoProcess)
.writer(userWriter)
.build();
}
}
編寫單元測試
package com.flight.neon.batch.demo.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoJobTest {
@Autowired
SimpleJobLauncher jobLauncher;
@Autowired
Job userJob;
@Test
public void demoTest() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(userJob, jobParameters);
}
}
運作後,可在資料庫中看到錄入資料