目錄
引言
概念
分區器
分區處理器
案例
轉視訊版
引言
接着上篇:Spring Batch 進階篇-并行步驟了解Spring Batch并行步驟後,接下來一起學習一下Spring Batch 進階功能-分區步驟
概念
分區:有劃分,區分意思,在SpringBatch 分區步驟講的是給執行步驟區分上下級。
上級: 主步驟(Master Step)
下級: 從步驟--工作步驟(Work Step)
主步驟是上司,不用幹活,負責管理從步驟,從步驟是下屬,必須幹活。
一個主步驟下轄管理多個從步驟。
注意: 從步驟,不管多小,它也一個完整的Spring Batch 步驟,負責各自的讀入、處理、寫入等。
分區步驟結構圖
分區步驟一般用于海量資料的處理上,其采用是分治思想。主步驟将大的資料劃分多個小的資料集,然後開啟多個從步驟,要求每個從步驟負責一個資料集。當所有從步驟處理結束,整作業流程才算結束。
分區器
主步驟核心元件,負責資料分區,将完整的資料拆解成多個資料集,然後指派給從步驟,讓其執行。
拆分規則由Partitioner分區器接口定制,預設的實作類:MultiResourcePartitioner
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
Partitioner 接口隻有唯一的方法:partition 參數gridSize表示要分區的大小,可以了解為要開啟多個worker步驟,傳回值是一個Map, 其中key:worker步驟名稱, value:worker步驟啟動需要參數值,一般包含分區中繼資料,比如起始位置,資料量等。
分區處理器
主步驟核心元件,統一管理work 步驟, 并給work步驟指派任務。
管理規則由PartitionHandler 接口定義,預設的實作類:TaskExecutorPartitionHandler
案例
需求:下面幾個檔案将資料讀入記憶體
步驟1:準備資料
user1-10.txt
1#dafei#18
2#dafei#18
3#dafei#18
4#dafei#18
5#dafei#18
6#dafei#18
7#dafei#18
8#dafei#18
9#dafei#18
10#dafei#18
user11-20.txt
11#dafei#18
12#dafei#18
13#dafei#18
14#dafei#18
15#dafei#18
16#dafei#18
17#dafei#18
18#dafei#18
19#dafei#18
20#dafei#18
user21-30.txt
21#dafei#18
22#dafei#18
23#dafei#18
24#dafei#18
25#dafei#18
26#dafei#18
27#dafei#18
28#dafei#18
29#dafei#18
30#dafei#18
user31-40.txt
31#dafei#18
32#dafei#18
33#dafei#18
34#dafei#18
35#dafei#18
36#dafei#18
37#dafei#18
38#dafei#18
39#dafei#18
40#dafei#18
user41-50.txt
41#dafei#18
42#dafei#18
43#dafei#18
44#dafei#18
45#dafei#18
46#dafei#18
47#dafei#18
48#dafei#18
49#dafei#18
50#dafei#18
步驟2:準備實體類
@Getter
@Setter
@ToString
public class User {
private Long id;
private String name;
private int age;
}
步驟3:配置分區邏輯
public class UserPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>(gridSize);
int range = 10; //檔案間隔
int start = 1; //開始位置
int end = 10; //結束位置
String text = "user%s-%s.txt";
for (int i = 0; i < gridSize; i++) {
ExecutionContext value = new ExecutionContext();
Resource resource = new ClassPathResource(String.format(text, start, end));
try {
value.putString("file", resource.getURL().toExternalForm());
} catch (IOException e) {
e.printStackTrace();
}
start += range;
end += range;
result.put("user_partition_" + i, value);
}
return result;
}
}
步驟4:全部代碼
package com.langfeiyes.batch._37_step_part;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class PartStepJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
//每個分區檔案讀取
@Bean
@StepScope
public FlatFileItemReader<User> flatItemReader(@Value("#{stepExecutionContext['file']}") Resource resource){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(resource)
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
//檔案分區器-設定分區規則
@Bean
public UserPartitioner userPartitioner(){
return new UserPartitioner();
}
//檔案分區處理器-處理分區
@Bean
public PartitionHandler userPartitionHandler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(5);
handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
handler.setStep(workStep());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}
//每個從分區操作步驟
@Bean
public Step workStep() {
return stepBuilderFactory.get("workStep")
.<User, User>chunk(10)
.reader(flatItemReader(null))
.writer(itemWriter())
.build();
}
//主分區操作步驟
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(workStep().getName(),userPartitioner())
.partitionHandler(userPartitionHandler())
.build();
}
@Bean
public Job partJob(){
return jobBuilderFactory.get("part-step-job")
.start(masterStep())
.build();
}
public static void main(String[] args) {
SpringApplication.run(PartStepJob.class, args);
}
}
結果:
User(id=31, name=dafei, age=18)
User(id=32, name=dafei, age=18)
User(id=33, name=dafei, age=18)
User(id=34, name=dafei, age=18)
User(id=35, name=dafei, age=18)
User(id=36, name=dafei, age=18)
User(id=37, name=dafei, age=18)
User(id=38, name=dafei, age=18)
User(id=39, name=dafei, age=18)
User(id=40, name=dafei, age=18)
User(id=41, name=dafei, age=18)
User(id=42, name=dafei, age=18)
User(id=43, name=dafei, age=18)
User(id=44, name=dafei, age=18)
User(id=45, name=dafei, age=18)
User(id=46, name=dafei, age=18)
User(id=47, name=dafei, age=18)
User(id=48, name=dafei, age=18)
User(id=49, name=dafei, age=18)
User(id=50, name=dafei, age=18)
User(id=21, name=dafei, age=18)
User(id=22, name=dafei, age=18)
User(id=23, name=dafei, age=18)
User(id=24, name=dafei, age=18)
User(id=25, name=dafei, age=18)
User(id=26, name=dafei, age=18)
User(id=27, name=dafei, age=18)
User(id=28, name=dafei, age=18)
User(id=29, name=dafei, age=18)
User(id=30, name=dafei, age=18)
User(id=1, name=dafei, age=18)
User(id=2, name=dafei, age=18)
User(id=3, name=dafei, age=18)
User(id=4, name=dafei, age=18)
User(id=5, name=dafei, age=18)
User(id=6, name=dafei, age=18)
User(id=7, name=dafei, age=18)
User(id=8, name=dafei, age=18)
User(id=9, name=dafei, age=18)
User(id=10, name=dafei, age=18)
User(id=11, name=dafei, age=18)
User(id=12, name=dafei, age=18)
User(id=13, name=dafei, age=18)
User(id=14, name=dafei, age=18)
User(id=15, name=dafei, age=18)
User(id=16, name=dafei, age=18)
User(id=17, name=dafei, age=18)
User(id=18, name=dafei, age=18)
User(id=19, name=dafei, age=18)
User(id=20, name=dafei, age=18)
解析:核心點
1>檔案分區器:userPartitioner(), 分别加載5個檔案進入到程式
2>檔案分區處理器:userPartitionHandler() ,指定要分幾個區,由誰來處理
3>分區從步驟:workStep() 指定讀邏輯與寫邏輯
4>分區檔案讀取:flatItemReader(),需要傳入Resource對象,這個對象在userPartitioner()已經标記為file
5>分區主步驟:masterStep() ,指定分區名稱與分區器,指定分區處理器
到這,本篇就結束了,欲知後事如何,請聽下回分解~
轉視訊版
看文字不過瘾可以切換視訊版:Spring Batch高效批處理架構實戰