天天看點

快速使用元件-spring batch(3)讀檔案資料到資料庫快速使用元件-spring batch(3)讀檔案資料到資料庫1.引言2.開發環境3.Spring Batch提供的讀-處理-寫元件一覽4.開發流程5.總結參考資源

快速使用元件-spring batch(3)讀檔案資料到資料庫

tags: springbatch

文章目錄

  • 快速使用元件-spring batch(3)讀檔案資料到資料庫
  • 1.引言
  • 2.開發環境
  • 3.Spring Batch提供的讀-處理-寫元件一覽
    • 3.1 ItemReader
    • 3.2 ItemWriter
    • 3.3 ItemProcessor
  • 4.開發流程
    • 4.1 建立spring batch資料庫
      • 4.1.1 建立資料庫并執行sql腳本
      • 4.1.2 資料庫表說明
      • 4.1.3 建立示例目标資料庫
    • 4.2 配置多資料源
      • 4.2.1 添加mysql資料庫依賴
      • 4.2.2 配置多資料源通路
    • 4.3 添加User實體
    • 4.4 添加檔案讀取元件ItemReader
    • 4.5 添加處理元件ItemProcessor
    • 4.6 添加資料庫寫入元件ItemWriter
    • 4.7 組裝完整任務
    • 4.8 編寫測試
  • 5.總結
  • 參考資源

1.引言

上一篇文章《快速了解元件-spring batch(2)之helloworld》對

Spring Batch

進行了入門級的開發,也對基本的元件有了一定的了解。但實際開發過程中,更多的是涉及檔案及資料庫的操作,以定時背景運作的方式,實作批處理操作。典型操作是從文本資料(

csv/txt

等檔案)中讀取資料,然後寫入到資料庫存儲。如下圖所示:

快速使用元件-spring batch(3)讀檔案資料到資料庫快速使用元件-spring batch(3)讀檔案資料到資料庫1.引言2.開發環境3.Spring Batch提供的讀-處理-寫元件一覽4.開發流程5.總結參考資源

若需要開發此過程,可以按照上一篇文章所寫的,自定義

ItemReader

ItemWriter

來實作,但是

Spring Batch

其實已經提供現成的檔案讀取和資料庫寫入的元件,開發人員可以直接使用,提高開發效率。本文将會對檔案讀取和資料庫寫入進行實戰介紹。

2.開發環境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 建構工具Maven: 3.3.9
  • 日志元件logback:1.2.3
  • lombok:1.18.6

3.Spring Batch提供的讀-處理-寫元件一覽

在使用

Spring Batch

内置的讀寫元件時,首先我們先弄清楚有哪些元件可以用,按讀、寫、處理,見下面說明。

Spring Batch

已提供了比較全面的支援。

3.1 ItemReader

ItemReader 說明
ListItemReader 讀取List類型資料,隻能讀一次
ItemReaderAdapter ItemReader擴充卡,可以複用現有的讀操作
FlatFileItemReader 讀Flat類型檔案
StaxEventItemReader 讀XML類型檔案
JdbcCursorItemReader 基于JDBC遊标方式讀資料庫
HibernateCursorItemReader 基于Hibernate遊标方式讀資料庫
StoredProcedureItemReader 基于存儲過程讀資料庫
JpaPagingItemReader 基于Jpa方式分頁讀資料庫
JdbcPagingItemReader 基于JDBC方式分頁讀資料庫
HibernatePagingItemReader 基于Hibernate方式分頁讀取資料庫
JmsItemReader 讀取JMS隊列
IteratorItemReader 疊代方式的讀元件
MultiResourceItemReader 多檔案讀元件
MongoItemReader 基于分布式檔案存儲的資料庫 MongoDB讀元件
Neo4jItemReader 面向網絡的資料庫Neo4j的讀元件
ResourcesItemReader 基于批量資源的讀元件,每次讀取傳回資源對象 AmqpItemReader讀取AMQP隊列元件
RepositoryItemReader 基于 Spring Data的讀元件

3.2 ItemWriter

ItemWriter 說明
FlatFileItemWriter 寫Flat類型檔案
MultiResourceItemWriter 多檔案寫元件
StaxEventItemWriter 寫XML類型檔案
AmqpItemWriter 寫AMQP類型消息
ClassifierCompositeItemWriter 根據 Classifier路由不同的Item到特定的ItemWriter處理
HiberateItemWriter 基于Hibernate方式寫資料庫
ItemWriterAdapter ItemWriter擴充卡,可以複用現有的寫服務
JdbcBatchItemWriter 基于JDBC方式寫資料庫
JmsItemWriter 寫JMS隊列 JpaItemWriter基于Jpa方式寫資料庫
GemfireItemWriter 基于分布式資料庫Gemfire的寫元件
SpELMappingGemfireItemWriter 基于Spring表達式語言寫分布式資料庫Gemfire的寫元件
MimeMessageItemWriter 發送郵件的寫元件
MongoItemWriter 基于分布式檔案存儲的資料庫MongoDB寫元件
Neo4jItemWriter 面向網絡的資料庫Neo4j的讀元件
PropertyExtractingDelegatingItemWriter 屬性抽取代理寫元件:通過調用給定的 Spring Bean方法執行寫入,參數由Item中指定的屬性字段擷取作為參數
RepositoryItemWriter基于 Spring Data的寫元件
SimpleMailMessageItemWriter 發送郵件的寫元件
CompositeItemWriter 條目寫的組合模式,支援組裝多個ItemWriter

3.3 ItemProcessor

ItemProcessor 說明
CompositeItemProcessor 組合處理器,可以封裝多個業務處理服務
ItemProcessorAdapter ItemProcessor擴充卡,可以複用現有的業務處理服務
PassThroughItemProcessor 不做任何業務處理,直接傳回讀到的資料
ValidatingItemProcessor 資料校驗處理器,支援對資料的校驗,如果校驗不通過可以進行過濾掉或者通過skip的方式跳過對記錄的處理

4.開發流程

根據目前示例,從

csv

檔案中讀資料,寫入到

mysql

資料庫,隻需要使用

FlatFileItemReader

JdbcBatchItemWriter

即可。下面對開發流程作簡要說明。示例工程可以在這裡擷取,裡面有檔案

resources/user-data.csv

及相應的目标資料庫腳本

mytest.sql

4.1 建立spring batch資料庫

4.1.1 建立資料庫并執行sql腳本

Spring Batch

的運作需要資料庫的支援,以儲存任務的運作狀态及結果。是以需要先建立資料庫。在

mysql

中建立名為

my_spring_batch

的資料庫。并在此資料庫中執行

Spring Batch

的資料庫腳本,腳本位置在

spring-batch-core-4.1.2.RELEASE.jar

的jar包中的

\org\springframework\batch\core\schema-mysql.sql

(也可以在示例工程的

sql

檔案夾中擷取)。執行完成後,資料庫表如下圖所示:

快速使用元件-spring batch(3)讀檔案資料到資料庫快速使用元件-spring batch(3)讀檔案資料到資料庫1.引言2.開發環境3.Spring Batch提供的讀-處理-寫元件一覽4.開發流程5.總結參考資源

4.1.2 資料庫表說明

資料庫共9張表,以

seq

結尾的是用于生成主鍵的。其它6張表,以

batch_job

開頭的是存儲任務的相關資訊,

batch_step

開頭的存儲步驟相關資訊。

  • job

    job instance

    job execution

    關系

    任務

    job

    是我們說的邏輯概念,即完整的一個批處理工作,它的執行個體就是

    job instance

    ,此任務資訊是存儲在

    batch_job_instance

    表中。有點類似java中的類和類執行個體的概念,是一對多的關系。對于每一個

    job instance

    ,執行的時候會生成記錄存儲在

    batch_job_execution

    中,表示每一個

    job

    執行的實際情況。注意,這裡

    job instance

    job execution

    也是一對多的關系,即同一個執行個體有可能會執行多次(如上一次執行失敗了,後面重新再執行)。
  • batch_job_execution_context

    batch_job_execution_params

    存儲任務執行時需要用到的上下文(以

    json

    格式存儲)及運作時使用的參數。
  • batch_step_execution

    batch_step_execution_context

    存儲任務執行過程中的作業步驟及運作時上下文。

4.1.3 建立示例目标資料庫

本示例隻涉及一個

test_user

表。建立

mytest

資料庫庫,執行

mytest.sql

腳本即可。

4.2 配置多資料源

一般來說,我們會把

Spring Batch

的資料存儲在獨立的資料庫中,而實際的應用使用的則是目标資料庫,是以需要配置多資料源通路。基于上一篇文章的工程進行開發。

4.2.1 添加mysql資料庫依賴

<!-- 資料庫相關依賴-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>
           

4.2.2 配置多資料源通路

Spring Boot

對多資料源的支援比較友好,配置也很簡單,先在配置檔案中添加資料庫配置,然後在java配置檔案中添加相應的注解即可。如下:

  • application.properties

    配置内容
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
           
  • DataSourceConfig

    配置内容

    建立

    DataSourceConfig.java

    檔案,配置多資料源,如下:
@Configuration
public class DataSourceConfig {
    @Bean("datasource")
    @ConfigurationProperties(prefix="spring.datasource")
    @Primary
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("targetDatasource")
    @ConfigurationProperties(prefix="spring.target-datasource")
    public DataSource targetDatasource() {
        return DataSourceBuilder.create().build();
    }
}
           

這樣,後面就可以直接使用

datasource

targetDatasource

兩個Bean進行資料庫通路。

4.3 添加User實體

本執行個體中,讀取

csv

檔案,轉為

User

實體,然後存儲到資料庫,是以需要先把

User

這個實體作一個定義。使用了

lombok

jpa

的注解,如下:

@Entity
@Data
@Table(name="test_user")
public class User{
    @Id
    @GeneratedValue
    /**
     * id
     */
    private Long id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 手機号
     */
    private String phone;
    ...略
           

4.4 添加檔案讀取元件ItemReader

使用内置的

FlatFileItemReader

即可。如下:

@Bean
    public ItemReader file2DbItemReader(){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return new FlatFileItemReaderBuilder<User>()
                .name(funcName)
                .resource(new ClassPathResource("user-data.csv"))
//                .linesToSkip(1)
                .delimited()
                .names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
                .fieldSetMapper(new UserFieldSetMapper())
                .build();
    }
           

說明:

  • FlatFileItemReaderBuilder

    用于建立

    FlatFileItemReader

    ,設定相應的行為,包括使用它來設定讀取檔案的位置(

    resource

    ),檔案分隔符(預設是

    ','

    ),是否跳過前面幾行(

    linesToSkip

    ),辨別每一列對應的列名稱(可與資料庫的字段名一緻)。設定檔案字段與資料庫實體字段的對應關系。
  • 設定檔案字段與資料庫實體字段的對應關系,使用

    FieldSetMapper

    來實作,其中

    FieldSet

    代表每一行文本資料,傳回值即為實體對象。如下所示:
public class UserFieldSetMapper implements FieldSetMapper<User> {
    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        String patternYmd = "yyyy-MM-dd";
        String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setName(fieldSet.readString("name"));
        user.setPhone(fieldSet.readString("phone"));
        user.setTitle(fieldSet.readString("title"));
        user.setEmail(fieldSet.readString("email"));
        user.setGender(fieldSet.readString("gender"));
        //此字段有可能為null
        String dataOfBirthStr = fieldSet.readString("date_of_birth");
        if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
            user.setDateOfBirth(null);
        }else{
            DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd);
            user.setDateOfBirth(dateTime.toJdkDate());
        }
        user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
        user.setSysCreateUser(fieldSet.readString("sys_create_user"));
        user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
        user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
        return user;
    }
}
           

4.5 添加處理元件ItemProcessor

由于

csv

文本檔案中的資料

null

值資料辨別符為

\N

,是以可以在處理元件中進行處理,把辨別符

\N

設定為

null

值。如下所示:

@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {

    @Override
    public User process(User user) throws Exception {
        user.setPhone(checkStr(user.getPhone()));
        user.setTitle(checkStr(user.getTitle()));
        user.setEmail(checkStr(user.getEmail()));
        user.setGender(checkStr(user.getGender()));
        log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
        return user;
    }

    public String checkStr(String dataToCheck){
        if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
            return null;
        }
        return dataToCheck;
    }
}
           

4.6 添加資料庫寫入元件ItemWriter

資料庫寫入元件使用

JdbcBatchItemWriter

即可,如下:

@Bean
    public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
        return new JdbcBatchItemWriterBuilder<User>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
                        "VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
                .dataSource(datasource)
                .build();
    }
           

說明:

  • 使用

    JdbcBatchItemWriterBuilder

    進行

    JdbcBatchItemWriter

    的建立,設定插入資料庫的sql語句,同時指定資料源即可。
  • @Qualifier("targetDatasource") DataSource datasource

    用于指定資料源
  • 使用

    BeanPropertyItemSqlParameterSourceProvider

    可以直接把讀取的資料實體的屬性資料作為參數填充到

    sql

    語句中,進而實作資料插入操作。

4.7 組裝完整任務

經過上面的操作,可以使用一個java配置,把讀、寫、處理組裝成完整的

step

job

,如下所示(詳細可見示例工程檔案):

File2DbBatchConfig.java

@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName)
                .listener(file2DbListener)
                .flow(file2DbStep)
                .end().build();
    }
@Bean
public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor
            ,ItemWriter file2DbWriter){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName)
                .<User,User>chunk(10)
                .reader(file2DbItemReader)
                .processor(file2DbProcessor)
                .writer(file2DbWriter)
                .build();
    }
           

4.8 編寫測試

參考上一文章的

ConsoleJobTest

,編寫

File2DbJobTest

檔案。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {

    @Autowired
    private JobLauncherService jobLauncherService;

    @Autowired
    private Job file2DbJob;

    @Test
    public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //建構任務運作參數
        JobParameters jobParameters = JobUtil.makeJobParameters();
        //執行并顯示結果
        Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
        Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
    }
}
           

執行後結果輸出如下(

exitCode=COMPLETED

):

快速使用元件-spring batch(3)讀檔案資料到資料庫快速使用元件-spring batch(3)讀檔案資料到資料庫1.引言2.開發環境3.Spring Batch提供的讀-處理-寫元件一覽4.開發流程5.總結參考資源

5.總結

本文先對

Spring Batch

的開箱即用的

ItemReader

ItemWriter

ItemProcessor

作了一個簡要的概覽,然後以讀

csv

檔案,處理null值,再插入到資料庫的處理邏輯為案例,介紹了

Spring Batch

的資料庫腳本,

FlatFileItemReader

JdbcBatchItemWriter

的使用。希望對大家更深入的了解

Spring Batch

有幫助,并能用到實踐中。

參考資源

  • 劉相《Spring Batch 批處理架構》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。
  • 《Spring Batch - Reference Documentation》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。

繼續閱讀