天天看點

Spring Batch 進階篇-多線程步驟

目錄

引言

概念

 案例

轉視訊版

引言

接着上篇:Spring Batch ItemWriter元件,了解Spring Batch ItemWriter處理元件後,接下來一起學習一下Spring Batch 進階功能-多線程步驟

概念

預設的情況下,步驟基本上在單線程中執行,那能不能在多線程環境執行呢?答案肯定是yes,但是也要注意,多線程環境步驟執行一定要慎重。原因:多線程環境下,步驟是要設定不可重新開機。

Spring Batch 的多線程步驟是使用Spring 的 TaskExecutor(任務執行器)實作的。約定每一個塊開啟一個線程獨立執行。

Spring Batch 進階篇-多線程步驟

 案例

需求:分5個塊處理user-thread.txt檔案

1>編寫user-thread.txt檔案

1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
6#zhangsan#14
7#lisi#13
8#wangwu#12
9#zhaoliu#11
10#qianqi#10
           

2>定義實體對象

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}
           

3>完整代碼

package com.langfeiyes.batch._35_step_thread;


import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.util.List;

@SpringBootApplication
@EnableBatchProcessing
public class ThreadStepJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public FlatFileItemReader<User> userItemReader(){

        System.out.println(Thread.currentThread());

        FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .saveState(false) //防止狀态被覆寫
                .resource(new ClassPathResource("user-thread.txt"))
                .delimited().delimiter("#")
                .names("id", "name", "age")
                .targetType(User.class)
                .build();

        return reader;
    }

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step step(){
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(2)
                .reader(userItemReader())
                .writer(itemWriter())
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();

    }

    @Bean
    public Job job(){
        return jobBuilderFactory.get("thread-step-job")
                .start(step())
                .build();
    }
    public static void main(String[] args) {
        SpringApplication.run(ThreadStepJob.class, args);
    }
}
           

4>結果

User(id=2, name=xiaofei, age=16)
User(id=5, name=feifei, age=15)
User(id=4, name=zhongfei, age=19)
User(id=7, name=lisi, age=13)
User(id=1, name=dafei, age=18)
User(id=6, name=zhangsan, age=14)
User(id=3, name=laofei, age=20)
User(id=8, name=wangwu, age=12)
User(id=9, name=zhaoliu, age=11)
User(id=10, name=qianqi, age=10)
           

解析

1:userItemReader() 加上saveState(false) Spring Batch 提供大部分的ItemReader是有狀态的,作業重新開機基本通過狀态來确定作業停止位置,而在多線程環境中,如果對象維護狀态被多個線程通路,可能存線上程間狀态互相覆寫問題。是以設定為false表示關閉狀态,但這也意味着作業不能重新開機了。

2:step() 方法加上.taskExecutor(new SimpleAsyncTaskExecutor()) 為作業步驟添加了多線程處理能力,以塊為機關,一個塊一個線程,觀察上面的結果,很明顯能看出輸出的順序是亂序的。改變 job 的名字再執行,會發現輸出資料每次都不一樣。

到這,本篇就結束了,欲知後事如何,請聽下回分解~

轉視訊版

看文字不過瘾可以切換視訊版:Spring Batch高效批處理架構實戰