天天看點

Springbatch從檔案讀取資料處理後寫入資料庫

使用Springboot  + mybatis架構

4.1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

           xsi:schemaLocation="http://maven.apache.org/POM/4.0.0

                          http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>



   <groupId>com.mlsama</groupId>

   <artifactId>batchPay</artifactId>

   <version>1.0.0-SNAPSHOT</version>

   <packaging>jar</packaging>



   <name>batchPay</name>

   <description>Demo project for Spring Boot</description>



   <parent>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-starter-parent</artifactId>

      <version>2.0.3.RELEASE</version>

      <relativePath/> <!-- lookup parent from repository -->

   </parent>



   <properties>

      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

      <java.version>1.8</java.version>

      <mybatis.version>1.3.2</mybatis.version>

   </properties>



   <build>

      <plugins>

         <plugin>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-maven-plugin</artifactId>

         </plugin>

      </plugins>

   </build>



   <dependencies>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-batch</artifactId>

      </dependency>

      <!--Spring batch與Quartz的整合包-->

      <dependency>

         <groupId>org.springframework</groupId>

         <artifactId>spring-context-support</artifactId>

      </dependency>

      <!--Quartz-->

      <dependency>

         <groupId>org.quartz-scheduler</groupId>

         <artifactId>quartz</artifactId>

         <version>2.3.0</version>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-web</artifactId>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-test</artifactId>

         <scope>test</scope>

      </dependency>

      <dependency>

         <groupId>org.springframework.batch</groupId>

         <artifactId>spring-batch-test</artifactId>

         <scope>test</scope>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-jdbc</artifactId>

      </dependency>

      <!--使用MySQL作為資料庫-->

      <dependency>

         <groupId>mysql</groupId>

         <artifactId>mysql-connector-java</artifactId>

         <scope>runtime</scope>

      </dependency>

      <!--lombok限制-->

      <dependency>

         <groupId>org.projectlombok</groupId>

         <artifactId>lombok</artifactId>

      </dependency>

      <!--日記-->

      <dependency>

         <groupId>org.slf4j</groupId>

         <artifactId>log4j-over-slf4j</artifactId>

      </dependency>

      <!--c3p0連接配接池-->

      <dependency>

         <groupId>com.mchange</groupId>

         <artifactId>c3p0</artifactId>

         <version>0.9.5.2</version>

      </dependency>

      <dependency>

         <groupId>org.apache.commons</groupId>

         <artifactId>commons-lang3</artifactId>

         <version>3.5</version>

      </dependency>

      <dependency>

         <groupId>org.mybatis.spring.boot</groupId>

         <artifactId>mybatis-spring-boot-starter</artifactId>

         <version>${mybatis.version}</version>

      </dependency>

   </dependencies>

</project>
           

4.1.2 application.properties

server.port=8888

#關閉項目啟動job自動執行

spring.batch.job.enabled = false

#整合c3p0,要自定義配置類,如:DataSourceConfiguration

spring.datasource.c3p0.driverClass=com.mysql.jdbc.Driver

spring.datasource.c3p0.jdbcUrl=jdbc:mysql://localhost:3306/springbatch

spring.datasource.c3p0.user=root

spring.datasource.c3p0.password=mlsama

spring.datasource.c3p0.maxPoolSize=30

spring.datasource.c3p0.minPoolSize=10

spring.datasource.c3p0.initialPoolSize=10



#mybatis配置

#别名包掃描

mybatis.typeAliasesPackage=com.mlsama.hellospringbatch.pojo

#引入*Mapper.xml檔案

mybatis.mapperLocations=classpath:mappers/**/*Mapper.xml

# 引入mybatis主配置檔案

mybatis.configLocation=classpath:mybatis-config.xml
           

4.1.3 整合c3p0,自定義資料源

@Configuration      //聲明為配置類

public class DataSourceConfiguration {



    @Bean(name = "dataSource")  //對象及名稱

    @Primary        //主要的候選者

    //配置屬性,prefix : 字首 spring.datasource固定

    @ConfigurationProperties(prefix = "spring.datasource.c3p0")

    public DataSource createDataSource(){

        return DataSourceBuilder.create() // 建立資料源建構對象

                .type(ComboPooledDataSource.class) // 設定資料源類型

                .build(); // 建構資料源對象



    }

}
           

4.1.4 mybatis-config.xml

<?xml version="1.0" encoding="UTF-8" ?>

<!DOCTYPE configuration

        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"

        "http://mybatis.org/dtd/mybatis-3-config.dtd">

<configuration>

    <!-- 全局的設定-->

    <settings>

        <!-- 開啟緩存 -->

        <setting name="cacheEnabled" value="true"/>

        <!-- 啟用延遲加載功能 -->

        <setting name="lazyLoadingEnabled" value="true"/>

        <!-- 按需要延遲加載-->

        <setting name="aggressiveLazyLoading" value="false"/>

        <!-- 開啟駝峰映射 (友善自動映射) -->

        <setting name="mapUnderscoreToCamelCase" value="true"/>

    </settings>

</configuration>
           

4.1.5 job的配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:batch="http://www.springframework.org/schema/batch"

       xsi:schemaLocation="http://www.springframework.org/schema/batch

                            http://www.springframework.org/schema/batch/spring-batch.xsd

                            http://www.springframework.org/schema/beans

                            http://www.springframework.org/schema/beans/spring-beans.xsd">



    <!--<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource">

        <property name="driverClass" value="com.mysql.jdbc.Driver"></property>

        <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/springbatch"></property>

        <property name="user" value="root"></property>

        <property name="password" value="mlsama"></property>

    </bean>-->



    <!--DataSource在DataSourceConfiguration中定義-->

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

        <property name="dataSource" ref="dataSource"></property>

    </bean>

    <!-- 配置spring batch的jobRepository,負責與資料庫打交道 -->

    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">

        <property name="dataSource" ref="dataSource"></property>

        <property name="transactionManager" ref="transactionManager"></property>

    </bean>

    <!-- 配置spring batch的 jobLauncher,用來啟動Job-->

    <bean id="batchJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">

        <property name="jobRepository" ref="jobRepository"></property>

    </bean>

    <!-- 注冊job,把job注入到容器中,在jobLauncher啟動job時才能從容器中擷取要啟動的Job -->

    <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry"/>



    <bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">

        <property name="jobRegistry" ref="jobRegistry"/>

    </bean>

    <!--“任務浏覽器(JObExplorer)”是“任務存儲器(JobRepository)”的隻讀版本,

                像後者一樣,它可以通過工廠bean簡單的配置生成-->

    <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">

        <property name="dataSource" ref="dataSource"></property>

    </bean>

    <!--讀取資料-->

    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">

        <property name="resource" value="classpath:/batchFiles/86000041-20181008.csv"></property>

        <property name="lineMapper" ref="lineMapper"></property>

        <property name="linesToSkip" value="1"></property>

    </bean>

    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

        <property name="lineTokenizer" ref="lineTokenizer"></property>

        <property name="fieldSetMapper" ref="fieldSetMapper"></property>

    </bean>

    <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

        <property name="names" value="procCode,account,accountName,amount,payType,reserve"></property>

    </bean>

    <bean id="fieldSetMapper" class="com.mlsama.hellospringbatch.reader.BatchPayReader"></bean>

    <!--核心業務處理-->

    <bean id="process" class="com.mlsama.hellospringbatch.processor.BatchPayItemProcessor"></bean>

    <!--持久化處理-->

    <bean id="write" class="com.mlsama.hellospringbatch.writer.BatchPayWriter"></bean>



    <!-- 配置job工作 -->

    <batch:job id="batchPayJob" restartable="true">

        <batch:step id="batchPayJobStep">

            <batch:tasklet transaction-manager="transactionManager">

                <batch:chunk reader="reader"

                             processor="process"

                             writer="write"

                             commit-interval="10">

                </batch:chunk>

            </batch:tasklet>

        </batch:step>

    </batch:job>



</beans>
           

4.1.6 資料源(.csv檔案)

86000041-20181008.csv

procCode,account,accountName,amount,payType,reserve

86000041,612565688853252536,df,203000,DS,轉賬

86000041,813565678853252533,ds,240000,DS,轉賬

86000041,615465688853252531,ed,203000,DS,轉賬

86000041,614565688853252532,de,200600,DS,轉賬

86000041,687565688853252534,er,200050,DS,轉賬


           

4.1.7 實體類

@Data

public class BatchPay {

    /**

     * 流水号

     */

    private String logSeq;

    /**

     * 支付機構編碼

     */

    private String procCode;

    /**

     * 賬号

     */

    private String account;

    /**

     * 戶名

     */

    private String accountName;

    /**

     * 金額

     */

    private BigDecimal amount;

    /**

     * 支付類型

     */

    private String paySeq;

    /**

     * 支付類型

     */

    private String payType;

    /**

     * 交易日期

     */

    private String tradeDate;

    /**

     * 交易時間

     */

    private String tradeTime;

    /**

     * 交易結果

     */

    private String resultCode;

    /**

     * 交易傳回碼

     */

    private String respCode;

    /**

     * 交易傳回碼描述

     */

    private String respMsg;

    /**

     * 自定義域1

     */

    private String reserve;



    public BatchPay(String procCode, String account, String accountName, BigDecimal amount, String payType, String reserve) {

        this.procCode = procCode;

        this.account = account;

        this.accountName = accountName;

        this.amount = amount;

        this.payType = payType;

        this.reserve = reserve;

    }



    @Override

    public String toString() {

        return "BatchPay{" +

                "logSeq='" + logSeq + '\'' +

                ", procCode='" + procCode + '\'' +

                ", account='" + account + '\'' +

                ", accountName='" + accountName + '\'' +

                ", amount=" + amount +

                ", payType='" + payType + '\'' +

                ", paySeq='" + paySeq + '\'' +

                ", tradeDate='" + tradeDate + '\'' +

                ", tradeTime='" + tradeTime + '\'' +

                ", resultCode='" + resultCode + '\'' +

                ", respCode='" + respCode + '\'' +

                ", respMsg='" + respMsg + '\'' +

                ", reserve1='" + reserve + '\'' +

                '}';

    }

}
           

4.1.8 讀取資料源

使用FlatFileItemReader類,其實已經在job的配置檔案中有了配置:

<!--讀取資料-->

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">

    <!--資料源檔案-->

    <property name="resource" value="classpath:/batchFiles/86000041-20181008.csv"></property>

    <!--将一行映射成一個對象-->

    <property name="lineMapper" ref="lineMapper"></property>

    <!--跳過第一行-->

    <property name="linesToSkip" value="1"></property>

</bean>

<!--行映射器-->

<bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

    <!--設定行字段-->

    <property name="lineTokenizer" ref="lineTokenizer"></property>

    <!--進行行對象映射-->

    <property name="fieldSetMapper" ref="fieldSetMapper"></property>

</bean>

<bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

    <property name="names" value="procCode,account,accountName,amount,payType,reserve"></property>

</bean>

<bean id="fieldSetMapper" class="com.mlsama.hellospringbatch.reader.BatchPayReader"></bean>
           

将行映射為對象的類需要自定義:

BatchPayReader.java

public class BatchPayReader implements FieldSetMapper<BatchPay>{

    @Override

    public BatchPay mapFieldSet(FieldSet fieldSet) throws BindException {

        return new BatchPay(

                //fieldSet裡有每一行的資料,從0開始

                fieldSet.readString(0),

                fieldSet.readString(1),

                fieldSet.readString(2),

                new BigDecimal(fieldSet.readString(3)),

                fieldSet.readString(4),

                fieldSet.readString(5)

        );

    }

}
           

4.1.9 業務處理

在配置檔案中也有了定義

<!--核心業務處理-->

<bean id="process" class="com.mlsama.hellospringbatch.processor.BatchPayItemProcessor"></bean>
           

BatchPayItemProcessor.Java如下:

@Slf4j

public class BatchPayItemProcessor implements ItemProcessor<BatchPay,BatchPay>{

    @Override

    public BatchPay process(BatchPay batchPay) throws Exception {

        String logSeq = CommonUtil.getLogSeq();

        batchPay.setLogSeq(logSeq);

        batchPay.setPaySeq(logSeq);

        batchPay.setTradeDate(new SimpleDateFormat(CommonConstants.DATA_FORMAT).format(new Date()));

        batchPay.setTradeTime(new SimpleDateFormat(CommonConstants.Time_FORMAT).format(new Date()));

        batchPay.setResultCode(CommonConstants.RESULT_CODE_0050);

        return batchPay;

    }

}
           

4.1.10 持久化

在配置檔案中的配置如下:

<!--持久化處理-->

<bean id="write" class="com.mlsama.hellospringbatch.writer.BatchPayWriter"></bean>
           

BatchPayWriter.Java如下:

@Slf4j

public class BatchPayWriter implements ItemWriter<BatchPay> {

    @Autowired

    private BatchPayManager batchPayManager;

    @Override

    public void write(List<? extends BatchPay> list) throws Exception {

        log.info("開始持久化資料.list={}",list);

        try {

            batchPayManager.batchPayInsert(list);

        }catch (Exception e){

            log.error("持久化資料異常{}.",e);

        }

    }

}
           

4.1.11 統一管理層

@Service

public class BatchPayManager {

    @Autowired

    private BatchPayMapper batchPayMapper;



    @Transactional(rollbackFor = Exception.class)

    public void batchPayInsert(List<? extends BatchPay> list){

        batchPayMapper.batchInsert(list);

    }

}
           

4.1.12 mybatis的mapper層

@Mapper

public interface BatchPayMapper {



    void batchInsert(List<? extends BatchPay> list);

}
           

4.1.13 mybatis的mapper配置

在resource下建立檔案夾mappers,建立配置檔案BatchPayMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>

<!DOCTYPE mapper

        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"

        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.mlsama.hellospringbatch.mapper.BatchPayMapper">



    <insert id="batchInsert" parameterType="list">

          INSERT INTO t_batch_pay

            (

            log_seq,

            proc_code,

            account,

            account_name,

            amount,

            pay_seq,

            pay_type,

            trade_date,

            trade_time,

            result_code,

            resp_code,

            resp_msg,

            reserver

            )VALUES

            <foreach collection="list" item="item" separator=",">

            (

            #{item.logSeq, jdbcType=VARCHAR},

            #{item.procCode, jdbcType=VARCHAR},

            #{item.account, jdbcType=VARCHAR},

            #{item.accountName, jdbcType=VARCHAR},

            #{item.amount, jdbcType=DECIMAL},

            #{item.paySeq, jdbcType=VARCHAR},

            #{item.payType, jdbcType=VARCHAR},

            #{item.tradeDate, jdbcType=VARCHAR},

            #{item.tradeTime, jdbcType=VARCHAR},

            #{item.resultCode, jdbcType=VARCHAR},

            #{item.respCode, jdbcType=VARCHAR},

            #{item.respMsg, jdbcType=VARCHAR},

            #{item.reserve, jdbcType=VARCHAR}

            )

        </foreach>

    </insert>



</mapper>
           

4.1.14 job排程層

用于啟動job,通過配置在job配置檔案中的SimpleJobLauncher啟動job.

@Service

@Slf4j

public class BatchPayJobLauncher {

    @Resource(name = "batchJobLauncher")

    private SimpleJobLauncher batchJobLauncher;



    @Resource(name = "batchPayJob")

    private Job batchPayJob;



    public String doBatchPayJob(){

        try {

            JobExecution jobExecution = batchJobLauncher.run(batchPayJob,  new RunIdIncrementer().getNext(null));

            return jobExecution.getStatus().name();

        }catch (Exception e){

            log.error("執行job:batchPayJob發生異常{}",e);

        }

        return null;

    }

}
           

4.1.15 控制層

接受頁面請求,調用job排程層方法,啟動job

@Controller

@RequestMapping("/batchPay")

public class BatchPayController {

    @Autowired

    private BatchPayJobLauncher batchPayJobLauncher;



    @ResponseBody

    @GetMapping("/doBatchPayJob")

    public String doBatchPayJob(){

        String result = batchPayJobLauncher.doBatchPayJob();

        if (StringUtils.isBlank(result)){

            result = "調用失敗.發生異常";

        }

        return result;

    }

}
           

4.1.16 項目啟動類

@SpringBootApplication  //springboot啟動

//@EnableBatchProcessing    //加載所有的job,這裡不能有這個,否則不能排程

@ImportResource(locations = "classpath:*-config.xml")

public class Application {

   public static void main(String[] args) {

      /** 建立SpringApplication對象 */

      SpringApplication springApplication =

            new SpringApplication(Application.class);

      /** 設定橫幅關閉 */

      springApplication.setBannerMode(Banner.Mode.OFF);

      /** 運作 */

      springApplication.run(args);

   }

}
           

4.1.17 項目結構

Springbatch從檔案讀取資料處理後寫入資料庫
Springbatch從檔案讀取資料處理後寫入資料庫

運作項目啟動類,通路: http://localhost:8888/

batchPay/doBatchPayJob

繼續閱讀