天天看點

Spring Batch 整理什麼是Spring Batch HelloWorld多個processor檔案讀寫處理類CsvItemProcessor多個processor操作資料庫skip的介紹

什麼是Spring Batch

Spring Batch 作為 Spring的子項目,是一款基于 Spring的企業批處理架構。通過它可以建構出健壯的企業批處理應用。Spring Batch不僅提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及并發處理,同時還支援日志、監控、任務重新開機與跳過等特性,大大簡化了批處理應用開發,将開發人員從複雜的任務配置管理過程中解放出來,使他們可以更多地去關注核心的業務處理過程。

另外我們還需要知道,Spring Batch是一款批處理應用架構,不是排程架構。它隻關注批處理任務相關的問題,如事務、并發、監控、執行等,并不提供相應的排程功能。是以,如果我們希望批處理任務定期執行,可結合 Quartz等成熟的排程架構實作。

Spring Batch 還針對讀、寫操作提供了多種實作,如消息、檔案、資料庫。對于資料庫,還提供了 Hibernate、iBatis、JPA等常見 ORM架構的讀、寫接口支援。

所有 Spring Batch的讀操作均需要實作 ItemReader接口,而且 Spring Batch為我們提供了多種預設實作,尤其是基于 ORM架構的讀接口,同時支援基于遊标和分頁兩類操作。是以,多數情況下我們并不需要手動編寫ItemReader類,而是直接使用相應實作類即可。

Spring Batch 整理什麼是Spring Batch HelloWorld多個processor檔案讀寫處理類CsvItemProcessor多個processor操作資料庫skip的介紹

上圖描繪了Spring Batch的執行過程。說明如下:

   每個Batch都會包含一個Job。Job就像一個容器,這個容器裡裝了若幹Step,Batch中實際幹活的也就是這些Step,至于Step幹什麼活,無外乎讀取資料,處理資料,然後将這些資料存儲起來(ItemReader用來讀取資料,ItemProcessor用來處理資料,ItemWriter用來寫資料)。JobLauncher用來啟動Job,JobRepository是上述處理提供的一種持久化機制,它為JobLauncher,Job,和Step執行個體提供CRUD操作。

   外部控制器調用JobLauncher啟動一個Job,Job調用自己的Step去實作對資料的操作,Step處理完成後,再将處理結果一步步傳回給上一層,這就是Batch處理實作的一個簡單流程。

Spring Batch 整理什麼是Spring Batch HelloWorld多個processor檔案讀寫處理類CsvItemProcessor多個processor操作資料庫skip的介紹

從DB或是檔案中取出資料的時候,read()操作每次隻讀取一條記錄,之後将讀取的這條資料傳遞給processor(item)處理,架構将重複做這兩步操作,直到讀取記錄的件數達到batch配置資訊中”commin-interval”設定值的時候,就會調用一次write操作。然後再重複上圖的處理,直到處理完所有的資料。當這個Step的工作完成以後,或是跳到其他Step,或是結束處理。

     這就是一個SpringBatch的基本工作流程。

HelloWorld

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:mvc="http://www.springframework.org/schema/mvc" 
	xmlns:tx="http://www.springframework.org/schema/tx" 
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd  
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd 
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd 
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">

   <!--  <import resource="applicationContext.xml"/>-->

    <batch:job id="batchhelloWorldJob" >
        <batch:step id="step_hello" next="step_world">
            <tasklet ref="hello" transaction-manager="transactionManagerBatch"></tasklet>
        </batch:step>
        <batch:step id="step_world">
            <tasklet ref="world" transaction-manager="transactionManagerBatch"></tasklet>
        </batch:step>
    </batch:job>

    <bean id="hello" class="com.sds.job.SayHello">
        <property name="message" value="Hello "></property>
    </bean>

    <bean id="world" class="com.sds.job.SayHello">
        <property name="message" value=" World!"></property>
    </bean>
    
    <bean id="jobLauncher"
			class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
			<property name="jobRepository" ref="jobRepository" />
		</bean>

		<bean id="jobRepository"
			class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />


		<bean id="transactionManagerBatch"
			class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    
    	<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry"/>
    
    	<bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
		<property name="jobRegistry" ref="jobRegistry"/>
	</bean>
    
    
</beans>
           

多個processor

多個processor
<batch:job id="csvJob">
        <batch:step id="csvStep">
            <tasklet transaction-manager="transactionManagerBatch">
                <chunk reader="csvItemReader" writer="csvItemWriter" processor="CompositeProcessor" commit-interval="1">
                </chunk>
            </tasklet>
        </batch:step>
        
    </batch:job>

<!-- 多個 processor -->
    <bean id="CompositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
        	<ref bean="csvItemProcessor" />
        	<ref bean="WriteIntoDB" />
            
        </list>
    </property>
	</bean>
           

檔案讀寫

CsvItemProcessor 實作了 

ItemProcessor<I,O>

 接口,其中類型 I 表示傳遞給處理器的對象類型,而 O 則表示處理器傳回的對象類型。

<batch:job id="csvJob">
        <batch:step id="csvStep">
            <tasklet transaction-manager="transactionManagerBatch">
                <chunk reader="csvItemReader" writer="csvItemWriter" processor="CompositeProcessor" commit-interval="1">
                </chunk>
            </tasklet>
        </batch:step>
        
    </batch:job>
    
    
    <!-- 多個 processor -->
    <bean id="CompositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
        	<ref bean="csvItemProcessor" />
        	<ref bean="WriteIntoDB" />
            
        </list>
    </property>
	</bean>	
    
    <bean id = 'csvItemProcessor' class="com.sds.processor.CsvItemProcessor"></bean>
    
    <!-- 讀取csv檔案 -->
    <bean id="csvItemReader"
        class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="file:src/inputFile.txt"/>
        
        <!-- <property name="linesToSkip" value="1" /> 告訴 file reader 有多少标題行需要跳過。 
        	通常CSV檔案的第一行包含标題資訊,如列名稱,是以本例中讓 reader 跳過檔案的第一行-->
        <property name="lineMapper">
            <bean
                class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer" ref="lineTokenizer"/>
                <property name="fieldSetMapper">
                    <bean
                        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="person"></property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="person" class="com.sds.bean.Person"></bean>

    <!-- lineTokenizer -->
    <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names">
            <list>
                <value>pname</value>
                <value>pid</value>
                <value>age</value>
                <value>birthday</value>
            </list>
        </property>
    </bean>
    
    
    <!-- 寫CSV檔案 -->
    <bean id="csvItemWriter"
        class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
        <property name="resource" value="file:src/outputFile.txt"/>
        <property name="lineAggregator">
            <bean
                class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="delimiter" value=","></property>
                <property name="fieldExtractor">
                    <bean
                        class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="pname,pid,age,birthday"></property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>      

處理類CsvItemProcessor

public class CsvItemProcessor implements ItemProcessor<Person,Person> {

	@Override
	public Person process(Person person) throws Exception {
		System.out.println("------------------------");
		System.out.println(person.getPname());
		person.setPname(person.getPname() + "~");
        return person;
	}

}
           

多個processor

<batch:job id="csvJob">
        <batch:step id="csvStep">
            <tasklet transaction-manager="transactionManagerBatch">
                <chunk reader="csvItemReader" writer="csvItemWriter" processor="CompositeProcessor" commit-interval="1">
                </chunk>
            </tasklet>
        </batch:step>
        
    </batch:job>

<!-- 多個 processor -->
    <bean id="CompositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
        	<ref bean="csvItemProcessor" />
        	<ref bean="WriteIntoDB" />
            
        </list>
    </property>
	</bean>
           

操作資料庫

@Component("WriteIntoDB")
public class WriteIntoDB implements ItemProcessor<Person,Person>{
	@Resource(name="personDao")
	public IPersonDao dao ; 
	
	@Override
	public Person process(Person item) throws Exception {
		System.out.println("******************");
		System.out.println(item.getPname() + item.getBirthday()); 
		dao.insertPerson(item);
		System.out.println("insert success");
		return item;
	}

}
           

skip的介紹

在實際的項目開發中,我們常常要将幾十萬甚至上百萬的資料從檔案導入到DB中,如果其中某條資料導入時發生例外,我們并不想整個Job以失敗而結束,而是希望能将錯誤的資料經過處理後儲存起來,其餘正确的資料繼續做導入處理。如果遇到這樣的場景,SpringBatch的skip機制就可以派上用場了。顧名思義,skip的作用就是跳過某些資料(例如錯誤資料)。 

<job id="csvJob">
 2         <step id="csvStep">
 3             <tasklet transaction-manager="transactionManager">
 4                 <chunk reader="itemReaders" writer="itemWriter" processor="itemProcessor"
 5                     commit-interval="1" skip-limit="1000">
 6                     <skippable-exception-classes>
 7                         <include class="org.springframework.batch.item.file.FlatFileParseException" />
 8                     </skippable-exception-classes>
 9                 </chunk>
10             </tasklet>
11         </step>
12     </job>
           

繼續閱讀