天天看點

Spring Batch 進階篇-分區步驟

目錄

引言

概念

分區器

分區處理器

案例

轉視訊版

引言

接着上篇:Spring Batch 進階篇-并行步驟了解Spring Batch并行步驟後,接下來一起學習一下Spring Batch 進階功能-分區步驟

概念

分區:有劃分,區分意思,在SpringBatch 分區步驟講的是給執行步驟區分上下級。

上級: 主步驟(Master Step)

下級: 從步驟--工作步驟(Work Step)

主步驟是上司,不用幹活,負責管理從步驟,從步驟是下屬,必須幹活。

一個主步驟下轄管理多個從步驟。

注意: 從步驟,不管多小,它也一個完整的Spring Batch 步驟,負責各自的讀入、處理、寫入等。

分區步驟結構圖

Spring Batch 進階篇-分區步驟

 分區步驟一般用于海量資料的處理上,其采用是分治思想。主步驟将大的資料劃分多個小的資料集,然後開啟多個從步驟,要求每個從步驟負責一個資料集。當所有從步驟處理結束,整作業流程才算結束。

分區器

主步驟核心元件,負責資料分區,将完整的資料拆解成多個資料集,然後指派給從步驟,讓其執行。

拆分規則由Partitioner分區器接口定制,預設的實作類:MultiResourcePartitioner

public interface Partitioner {
	Map<String, ExecutionContext> partition(int gridSize);
}
           

Partitioner 接口隻有唯一的方法:partition 參數gridSize表示要分區的大小,可以了解為要開啟多個worker步驟,傳回值是一個Map, 其中key:worker步驟名稱, value:worker步驟啟動需要參數值,一般包含分區中繼資料,比如起始位置,資料量等。

分區處理器

主步驟核心元件,統一管理work 步驟, 并給work步驟指派任務。

管理規則由PartitionHandler 接口定義,預設的實作類:TaskExecutorPartitionHandler

案例

需求:下面幾個檔案将資料讀入記憶體

Spring Batch 進階篇-分區步驟

步驟1:準備資料

user1-10.txt

1#dafei#18
2#dafei#18
3#dafei#18
4#dafei#18
5#dafei#18
6#dafei#18
7#dafei#18
8#dafei#18
9#dafei#18
10#dafei#18
           

user11-20.txt

11#dafei#18
12#dafei#18
13#dafei#18
14#dafei#18
15#dafei#18
16#dafei#18
17#dafei#18
18#dafei#18
19#dafei#18
20#dafei#18
           

user21-30.txt

21#dafei#18
22#dafei#18
23#dafei#18
24#dafei#18
25#dafei#18
26#dafei#18
27#dafei#18
28#dafei#18
29#dafei#18
30#dafei#18
           

user31-40.txt

31#dafei#18
32#dafei#18
33#dafei#18
34#dafei#18
35#dafei#18
36#dafei#18
37#dafei#18
38#dafei#18
39#dafei#18
40#dafei#18
           

user41-50.txt

41#dafei#18
42#dafei#18
43#dafei#18
44#dafei#18
45#dafei#18
46#dafei#18
47#dafei#18
48#dafei#18
49#dafei#18
50#dafei#18
           

步驟2:準備實體類

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}
           

步驟3:配置分區邏輯

public class UserPartitioner  implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<>(gridSize);

        int range = 10; //檔案間隔
        int start = 1; //開始位置
        int end = 10;  //結束位置
        String text = "user%s-%s.txt";

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext value = new ExecutionContext();
            Resource resource = new ClassPathResource(String.format(text, start, end));
            try {
                value.putString("file", resource.getURL().toExternalForm());
            } catch (IOException e) {
                e.printStackTrace();
            }
            start += range;
            end += range;

            result.put("user_partition_" + i, value);
        }
        return result;
    }
}
           

步驟4:全部代碼

package com.langfeiyes.batch._37_step_part;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.util.List;

@SpringBootApplication
@EnableBatchProcessing
public class PartStepJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;



    //每個分區檔案讀取
    @Bean
    @StepScope
    public FlatFileItemReader<User> flatItemReader(@Value("#{stepExecutionContext['file']}") Resource resource){
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(resource)
                .delimited().delimiter("#")
                .names("id", "name", "age")
                .targetType(User.class)
                .build();
    }

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }


    //檔案分區器-設定分區規則
    @Bean
    public UserPartitioner userPartitioner(){
        return new UserPartitioner();
    }

    //檔案分區處理器-處理分區
    @Bean
    public PartitionHandler userPartitionHandler() {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setGridSize(5);
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        handler.setStep(workStep());
        try {
            handler.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return handler;
    }
    
    //每個從分區操作步驟
    @Bean
    public Step workStep() {
        return stepBuilderFactory.get("workStep")
                .<User, User>chunk(10)
                .reader(flatItemReader(null))
                .writer(itemWriter())
                .build();
    }

    //主分區操作步驟
    @Bean
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep")
                .partitioner(workStep().getName(),userPartitioner())
                .partitionHandler(userPartitionHandler())
                .build();
    }


    @Bean
    public Job partJob(){
        return jobBuilderFactory.get("part-step-job")
                .start(masterStep())
                .build();
    }
    public static void main(String[] args) {
        SpringApplication.run(PartStepJob.class, args);
    }
}
           

結果:

User(id=31, name=dafei, age=18)
User(id=32, name=dafei, age=18)
User(id=33, name=dafei, age=18)
User(id=34, name=dafei, age=18)
User(id=35, name=dafei, age=18)
User(id=36, name=dafei, age=18)
User(id=37, name=dafei, age=18)
User(id=38, name=dafei, age=18)
User(id=39, name=dafei, age=18)
User(id=40, name=dafei, age=18)
User(id=41, name=dafei, age=18)
User(id=42, name=dafei, age=18)
User(id=43, name=dafei, age=18)
User(id=44, name=dafei, age=18)
User(id=45, name=dafei, age=18)
User(id=46, name=dafei, age=18)
User(id=47, name=dafei, age=18)
User(id=48, name=dafei, age=18)
User(id=49, name=dafei, age=18)
User(id=50, name=dafei, age=18)
User(id=21, name=dafei, age=18)
User(id=22, name=dafei, age=18)
User(id=23, name=dafei, age=18)
User(id=24, name=dafei, age=18)
User(id=25, name=dafei, age=18)
User(id=26, name=dafei, age=18)
User(id=27, name=dafei, age=18)
User(id=28, name=dafei, age=18)
User(id=29, name=dafei, age=18)
User(id=30, name=dafei, age=18)
User(id=1, name=dafei, age=18)
User(id=2, name=dafei, age=18)
User(id=3, name=dafei, age=18)
User(id=4, name=dafei, age=18)
User(id=5, name=dafei, age=18)
User(id=6, name=dafei, age=18)
User(id=7, name=dafei, age=18)
User(id=8, name=dafei, age=18)
User(id=9, name=dafei, age=18)
User(id=10, name=dafei, age=18)
User(id=11, name=dafei, age=18)
User(id=12, name=dafei, age=18)
User(id=13, name=dafei, age=18)
User(id=14, name=dafei, age=18)
User(id=15, name=dafei, age=18)
User(id=16, name=dafei, age=18)
User(id=17, name=dafei, age=18)
User(id=18, name=dafei, age=18)
User(id=19, name=dafei, age=18)
User(id=20, name=dafei, age=18)
           

解析:核心點

1>檔案分區器:userPartitioner(), 分别加載5個檔案進入到程式

2>檔案分區處理器:userPartitionHandler() ,指定要分幾個區,由誰來處理

3>分區從步驟:workStep() 指定讀邏輯與寫邏輯

4>分區檔案讀取:flatItemReader(),需要傳入Resource對象,這個對象在userPartitioner()已經标記為file

5>分區主步驟:masterStep() ,指定分區名稱與分區器,指定分區處理器

到這,本篇就結束了,欲知後事如何,請聽下回分解~

轉視訊版

看文字不過瘾可以切換視訊版:Spring Batch高效批處理架構實戰