SpringBatch介紹:
SpringBatch 是一個大資料量的并行處理架構。通常用于資料的離線遷移,和資料處理,⽀持事務、并發、流程、監控、縱向和橫向擴充,提供統⼀的接⼝管理和任務管理;SpringBatch是SpringSource和埃森哲為了統一業界并行處理标準為廣大開發者提供友善開發的一套架構。
官方位址:github.com/spring-projects/spring-batch
- SpringBatch 本身提供了重試,異常處理,跳過,重新開機、任務處理統計,資源管理等特性,這些特性開發者看重他的主要原因;
- SpringBatch 是一個輕量級的批處理架構;
- SpringBatch 結構分層,業務與處理政策、結構分離;
- 任務的運作的執行個體狀态,執行資料,參數都會落地到資料庫;
https://blog.didispace.com/spring-batch-1/#%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8 快速入門
- pom.xml 添加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
- 建立BatchConfig(可以是其他類名)
@Configuration
@EnableBatchProcessing
public class BatchConfig {
// tag::readerwriterprocessor[]
@Bean
public FlatFileItemReader<Person> flatFileItemReader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("sample-data.csv"));
FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"firstName", "lastName"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
@Bean
public JdbcPagingItemReader<Person> jdbcPagingItemReader(DataSource dataSource) {
JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(100);
reader.setQueryProvider(new MySqlPagingQueryProvider() {{
setSelectClause("SELECT person_id,first_name,last_name");
setFromClause("from people");
setWhereClause("last_name=:lastName");
setSortKeys(new HashMap<String, Order>() {{
put("person_id", Order.ASCENDING);
}});
}});
reader.setParameterValues(new HashMap<String, Object>() {{
put("lastName", "DOE");
}});
reader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
return reader;
}
@Bean
public JdbcBatchItemWriter<Person> jdbcBatchItemWriter(DataSource dataSource) {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
/*@Bean
public FlatFileItemWriter<Person> flatFileItemWriter(DataSource dataSource) {
FlatFileItemWriter<Person> writer = new FlatFileItemWriter<>();
writer.setAppendAllowed(true);
writer.setEncoding("UTF-8");
// writer.set(dataSource);
return writer;
}*/
// end::readerwriterprocessor[]
// tag::jobstep[]
@Bean
public Job importUserJob(JobBuilderFactory jobBuilderFactory, JobCompletionNotificationListener listener, Step step) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step)
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, PersonItemProcessor processor, ItemWriter jdbcBatchItemWriter, ItemReader flatFileItemReader) {
/*CompositeItemProcessor compositeItemProcessor = new CompositeItemProcessor();
compositeItemProcessor.setDelegates(Lists.newArrayList(processor, processor));*/
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(flatFileItemReader)
.processor(processor)
.writer(jdbcBatchItemWriter)
.build();
Spring Batch的分層架構
- Insfrastructure 政策管理:包括任務的失敗重試,異常處理,事務,skip,以及資料的輸入輸出(文本檔案,DB,Message)
- Core: springBatch 的核心,包括JobLauch,job,step等等
- Application: 業務處理,建立任務,決定任務的執行方式(定時任務,手動觸發等)