使用 Spring batch分区来使用多个线程来处理 Spring 启动应用程序中的一系列数据集。
1. 并行处理和分步划分
1.1. 并行处理
大多数批处理问题可以使用单线程解决,但一些复杂的场景,例如单线程处理需要很长时间才能执行完成的任务,此时需要并行处理,可以使用多线程模型来实现。
在非常高的级别上,有两种并行处理模式:
- 单进程、多线程
- 多进程
这些也分为几类,如下所示:
- 多线程Step(单进程)
- 并行Step(单进程)
- 远程分块Step(多进程)
- 对Step 进行分区(单进程或多进程)
1.2. Spring batch分区
默认情况下,Spring batch 是单线程的。为了进行并行处理,我们需要对批处理作业的步骤进行分区。
在上图中,管理器是一个步骤,已分区为多个工作器步骤,这些工作器步骤也是步骤的实例。工作线程可以是一些远程服务、本地执行线程或任何其他独立任务。
Spring batch允许将输入数据从管理器传递到工作步骤,以便每个工作线程确切地知道该做什么。JobRepository 确保每个工作线程在单次执行作业时仅执行一次。
分区使用多个线程来处理一系列数据集。数据集的范围可以通过编程方式定义。这取决于我们想要创建多少个线程以在分区中使用。线程数完全基于需要/要求。
当我们有数百万条记录要从源系统中读取时,分区非常有用,我们不能只依靠单个线程来处理所有记录,这可能很耗时。我们希望使用多个线程来读取和处理数据,以有效地使用系统资源。
2. Spring batch分区示例
在本教程中,我们将从一个数据库表中读取一些数据并将其写入另一个表中。我们可以在数据库中创建数百万条记录,以便体验如果使用单线程批处理的过程需要多长时间。我创建了一些程序来了解程序/概念在这里的工作原理。
2.1. Maven
我们使用了最新版本的 Spring Boot,添加 spring-boot-starter-batch 依赖项将传递地拉取所需依赖项的最新版本。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
</dependencies>
2.2. 分区步骤
分区是中央策略接口,用于以ExecutionContext实例的形式为分区步骤创建输入参数。通常的目标是创建一组不同的输入值,例如一组不重叠的主键范围或唯一的文件名。
在此示例中,我们查询表以获取MAX和MIN的id 值(假设它们是顺序的),并基于此在所有记录之间创建分区。
对于分区,我们使用了gridSize = number of threads.根据您的要求使用您自己的自定义值。
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
public void setTable(String table) {
this.table = table;
}
public void setColumn(String column) {
this.column = column;
}
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if(end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
2.3.作业配置
这是作业配置类,我们将在其中创建执行作业所需的 bean。在此示例中,我们使用了TaskExecutor接口最简单的多线程实现SimpleAsyncTaskExecutor。
我们在Step中使用分区程序为远程(或本地)步骤创建分区步骤生成器。将它们用于每个数据块的“读取,处理和写入”,完全发生在不同的线程中。因此,处理后的记录可能与输入的顺序不同。
以下是要寻找的事项:
- 当任务执行程序由某个线程池支持时,会对它施加限制。此限制默认为 4,但可以进行不同的配置。
- 并发限制可能来自Step中使用的资源,例如使用的数据源。
- ColumnRangePartitioner – 中央策略界面,用于以ExecutionContext实例的形式为分区步骤创建输入参数。
- JdbcPagingItemReader – 此 Bean 使用分页读取数据,并根据范围接受 minValue 和 maxValue 以仅获取这些数据。在这里,我们将 FetchSize 设置为 1000,但是您可以使用任何值并使其可从属性文件进行配置。
- JdbcBatchItemWriter – 此 Bean 会将数据写入另一个表中。
- Step – 这是在批处理作业中配置的步骤。这是读取数据并将其写入XML和JSON格式。
- Job – 表示作业的批处理域对象。.
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import com.example.domain.Customer;
import com.example.mapper.CustomerRowMapper;
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(
@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
queryProvider.setSortKeys(sortKeys);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider
(new BeanPropertyItemSqlParameterSourceProvider<>());
itemWriter.afterPropertiesSet();
return itemWriter;
}
// 主
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.partitioner(workerStep().w(), paorkerrtitioner())
.step(workerStep())
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// 从
@Bean
public Step workerStep() {
return stepBuilderFactory.get("workerStep")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(customerItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
2.4 Entity and Mapper
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper类用于将结果集映射到Customer域对象。
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder()
.id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
}
}
2.5.application.properties
用于创建与 MySQL 数据库的数据库连接的配置。
spring.datasource.url=jdbc:h2:mem:test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
#禁止自动启动batch任务
spring.batch.job.enabled=false
2.6. 建表语句和初始化数据
schema.sql
CREATE TABLE customer (
id INT PRIMARY KEY,
firstName VARCHAR(255) NULL,
lastName VARCHAR(255) NULL,
birthdate VARCHAR(255) NULL
);
CREATE TABLE new_customer (
id INT PRIMARY KEY,
firstName VARCHAR(255) NULL,
lastName VARCHAR(255) NULL,
birthdate VARCHAR(255) NULL
);
data.sql
INSERT INTO customer VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO customer VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO customer VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO customer VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO customer VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO customer VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO customer VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO customer VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO customer VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO customer VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO customer VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO customer VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO customer VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO customer VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO customer VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO customer VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO customer VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO customer VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO customer VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO customer VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
3. 实战演示
将应用程序作为 Spring boot应用程序运行。
import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class LocalPartitioningApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(LocalPartitioningApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
应用程序将使用我们创建的分区从一个数据库中读取数据,并将其写入另一个表中。
2023-07-01 11:03:42.408 --- c.example.LocalPartitioningApplication : Started LocalPartitioningApplication
in 3.504 seconds (JVM running for 4.877)
2023-07-01 11:03:42.523 --- o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]]
launched with the following parameters: [{JobId=1688180622410, date=1688180622410, time=1688180622410}]
2023-07-01 11:03:42.603 --- o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
reading 1 to 5
reading 11 to 15
reading 16 to 20
reading 6 to 10
2023-07-01 11:03:42.890 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [workerStep:partition0] executed in 173ms
2023-07-01 11:03:42.895 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [workerStep:partition3] executed in 178ms
2023-07-01 11:03:42.895 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [workerStep:partition1] executed in 177ms
2023-07-01 11:03:42.901 --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep : Step: [workerStep:partition2] executed in 182ms
2023-07-01 11:03:42.917 --- o.s.batch.core.step.AbstractStep : Step: [step1] executed in 314ms
2023-07-01 11:03:42.942 --- o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed
with the following parameters: [{JobId=1688180622410, date=1688180622410, time=1688180622410}]
and the following status: [COMPLETED] in 374ms
STATUS :: COMPLETED