快速使用元件-spring batch(3)讀檔案資料到資料庫
tags: springbatch
文章目錄
- 快速使用元件-spring batch(3)讀檔案資料到資料庫
- 1.引言
- 2.開發環境
- 3.Spring Batch提供的讀-處理-寫元件一覽
-
- 3.1 ItemReader
- 3.2 ItemWriter
- 3.3 ItemProcessor
- 4.開發流程
-
- 4.1 建立spring batch資料庫
-
- 4.1.1 建立資料庫并執行sql腳本
- 4.1.2 資料庫表說明
- 4.1.3 建立示例目标資料庫
- 4.2 配置多資料源
-
- 4.2.1 添加mysql資料庫依賴
- 4.2.2 配置多資料源通路
- 4.3 添加User實體
- 4.4 添加檔案讀取元件ItemReader
- 4.5 添加處理元件ItemProcessor
- 4.6 添加資料庫寫入元件ItemWriter
- 4.7 組裝完整任務
- 4.8 編寫測試
- 5.總結
- 參考資源
1.引言
上一篇文章《快速了解元件-spring batch(2)之helloworld》對
Spring Batch
進行了入門級的開發,也對基本的元件有了一定的了解。但實際開發過程中,更多的是涉及檔案及資料庫的操作,以定時背景運作的方式,實作批處理操作。典型操作是從文本資料(
csv/txt
等檔案)中讀取資料,然後寫入到資料庫存儲。如下圖所示:
若需要開發此過程,可以按照上一篇文章所寫的,自定義
ItemReader
和
ItemWriter
來實作,但是
Spring Batch
其實已經提供現成的檔案讀取和資料庫寫入的元件,開發人員可以直接使用,提高開發效率。本文将會對檔案讀取和資料庫寫入進行實戰介紹。
2.開發環境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發IDE: IDEA
- 建構工具Maven: 3.3.9
- 日志元件logback:1.2.3
- lombok:1.18.6
3.Spring Batch提供的讀-處理-寫元件一覽
在使用
Spring Batch
内置的讀寫元件時,首先我們先弄清楚有哪些元件可以用,按讀、寫、處理,見下面說明。
Spring Batch
已提供了比較全面的支援。
3.1 ItemReader
ItemReader | 說明 |
---|---|
ListItemReader | 讀取List類型資料,隻能讀一次 |
ItemReaderAdapter | ItemReader擴充卡,可以複用現有的讀操作 |
FlatFileItemReader | 讀Flat類型檔案 |
StaxEventItemReader | 讀XML類型檔案 |
JdbcCursorItemReader | 基于JDBC遊标方式讀資料庫 |
HibernateCursorItemReader | 基于Hibernate遊标方式讀資料庫 |
StoredProcedureItemReader | 基于存儲過程讀資料庫 |
JpaPagingItemReader | 基于Jpa方式分頁讀資料庫 |
JdbcPagingItemReader | 基于JDBC方式分頁讀資料庫 |
HibernatePagingItemReader | 基于Hibernate方式分頁讀取資料庫 |
JmsItemReader | 讀取JMS隊列 |
IteratorItemReader | 疊代方式的讀元件 |
MultiResourceItemReader | 多檔案讀元件 |
MongoItemReader | 基于分布式檔案存儲的資料庫 MongoDB讀元件 |
Neo4jItemReader | 面向網絡的資料庫Neo4j的讀元件 |
ResourcesItemReader | 基于批量資源的讀元件,每次讀取傳回資源對象 AmqpItemReader讀取AMQP隊列元件 |
RepositoryItemReader | 基于 Spring Data的讀元件 |
3.2 ItemWriter
ItemWriter | 說明 |
---|---|
FlatFileItemWriter | 寫Flat類型檔案 |
MultiResourceItemWriter | 多檔案寫元件 |
StaxEventItemWriter | 寫XML類型檔案 |
AmqpItemWriter | 寫AMQP類型消息 |
ClassifierCompositeItemWriter | 根據 Classifier路由不同的Item到特定的ItemWriter處理 |
HiberateItemWriter | 基于Hibernate方式寫資料庫 |
ItemWriterAdapter | ItemWriter擴充卡,可以複用現有的寫服務 |
JdbcBatchItemWriter | 基于JDBC方式寫資料庫 |
JmsItemWriter | 寫JMS隊列 JpaItemWriter基于Jpa方式寫資料庫 |
GemfireItemWriter | 基于分布式資料庫Gemfire的寫元件 |
SpELMappingGemfireItemWriter | 基于Spring表達式語言寫分布式資料庫Gemfire的寫元件 |
MimeMessageItemWriter | 發送郵件的寫元件 |
MongoItemWriter | 基于分布式檔案存儲的資料庫MongoDB寫元件 |
Neo4jItemWriter | 面向網絡的資料庫Neo4j的讀元件 |
PropertyExtractingDelegatingItemWriter | 屬性抽取代理寫元件:通過調用給定的 Spring Bean方法執行寫入,參數由Item中指定的屬性字段擷取作為參數 |
RepositoryItemWriter基于 | Spring Data的寫元件 |
SimpleMailMessageItemWriter | 發送郵件的寫元件 |
CompositeItemWriter | 條目寫的組合模式,支援組裝多個ItemWriter |
3.3 ItemProcessor
ItemProcessor | 說明 |
---|---|
CompositeItemProcessor | 組合處理器,可以封裝多個業務處理服務 |
ItemProcessorAdapter | ItemProcessor擴充卡,可以複用現有的業務處理服務 |
PassThroughItemProcessor | 不做任何業務處理,直接傳回讀到的資料 |
ValidatingItemProcessor | 資料校驗處理器,支援對資料的校驗,如果校驗不通過可以進行過濾掉或者通過skip的方式跳過對記錄的處理 |
4.開發流程
根據目前示例,從
csv
檔案中讀資料,寫入到
mysql
資料庫,隻需要使用
FlatFileItemReader
和
JdbcBatchItemWriter
即可。下面對開發流程作簡要說明。示例工程可以在這裡擷取,裡面有檔案
resources/user-data.csv
及相應的目标資料庫腳本
mytest.sql
。
4.1 建立spring batch資料庫
4.1.1 建立資料庫并執行sql腳本
Spring Batch
的運作需要資料庫的支援,以儲存任務的運作狀态及結果。是以需要先建立資料庫。在
mysql
中建立名為
my_spring_batch
的資料庫。并在此資料庫中執行
Spring Batch
的資料庫腳本,腳本位置在
spring-batch-core-4.1.2.RELEASE.jar
的jar包中的
\org\springframework\batch\core\schema-mysql.sql
(也可以在示例工程的
sql
檔案夾中擷取)。執行完成後,資料庫表如下圖所示:
4.1.2 資料庫表說明
資料庫共9張表,以
seq
結尾的是用于生成主鍵的。其它6張表,以
batch_job
開頭的是存儲任務的相關資訊,
batch_step
開頭的存儲步驟相關資訊。
-
與job
與job instance
job execution
關系
任務
是我們說的邏輯概念,即完整的一個批處理工作,它的執行個體就是job
,此任務資訊是存儲在job instance
表中。有點類似java中的類和類執行個體的概念,是一對多的關系。對于每一個batch_job_instance
,執行的時候會生成記錄存儲在job instance
中,表示每一個batch_job_execution
執行的實際情況。注意,這裡job
和job instance
也是一對多的關系,即同一個執行個體有可能會執行多次(如上一次執行失敗了,後面重新再執行)。job execution
-
及batch_job_execution_context
存儲任務執行時需要用到的上下文(以batch_job_execution_params
格式存儲)及運作時使用的參數。json
-
及batch_step_execution
存儲任務執行過程中的作業步驟及運作時上下文。batch_step_execution_context
4.1.3 建立示例目标資料庫
本示例隻涉及一個
test_user
表。建立
mytest
資料庫庫,執行
mytest.sql
腳本即可。
4.2 配置多資料源
一般來說,我們會把
Spring Batch
的資料存儲在獨立的資料庫中,而實際的應用使用的則是目标資料庫,是以需要配置多資料源通路。基于上一篇文章的工程進行開發。
4.2.1 添加mysql資料庫依賴
<!-- 資料庫相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
4.2.2 配置多資料源通路
Spring Boot
對多資料源的支援比較友好,配置也很簡單,先在配置檔案中添加資料庫配置,然後在java配置檔案中添加相應的注解即可。如下:
-
配置内容application.properties
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
-
DataSourceConfig
配置内容
建立
檔案,配置多資料源,如下:DataSourceConfig.java
@Configuration
public class DataSourceConfig {
@Bean("datasource")
@ConfigurationProperties(prefix="spring.datasource")
@Primary
public DataSource batchDatasource() {
return DataSourceBuilder.create().build();
}
@Bean("targetDatasource")
@ConfigurationProperties(prefix="spring.target-datasource")
public DataSource targetDatasource() {
return DataSourceBuilder.create().build();
}
}
這樣,後面就可以直接使用
datasource
及
targetDatasource
兩個Bean進行資料庫通路。
4.3 添加User實體
本執行個體中,讀取
csv
檔案,轉為
User
實體,然後存儲到資料庫,是以需要先把
User
這個實體作一個定義。使用了
lombok
和
jpa
的注解,如下:
@Entity
@Data
@Table(name="test_user")
public class User{
@Id
@GeneratedValue
/**
* id
*/
private Long id;
/**
* 姓名
*/
private String name;
/**
* 手機号
*/
private String phone;
...略
4.4 添加檔案讀取元件ItemReader
使用内置的
FlatFileItemReader
即可。如下:
@Bean
public ItemReader file2DbItemReader(){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return new FlatFileItemReaderBuilder<User>()
.name(funcName)
.resource(new ClassPathResource("user-data.csv"))
// .linesToSkip(1)
.delimited()
.names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
.fieldSetMapper(new UserFieldSetMapper())
.build();
}
說明:
-
用于建立FlatFileItemReaderBuilder
,設定相應的行為,包括使用它來設定讀取檔案的位置(FlatFileItemReader
),檔案分隔符(預設是resource
),是否跳過前面幾行(','
),辨別每一列對應的列名稱(可與資料庫的字段名一緻)。設定檔案字段與資料庫實體字段的對應關系。linesToSkip
- 設定檔案字段與資料庫實體字段的對應關系,使用
來實作,其中FieldSetMapper
代表每一行文本資料,傳回值即為實體對象。如下所示:FieldSet
public class UserFieldSetMapper implements FieldSetMapper<User> {
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
String patternYmd = "yyyy-MM-dd";
String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
User user = new User();
user.setId(fieldSet.readLong("id"));
user.setName(fieldSet.readString("name"));
user.setPhone(fieldSet.readString("phone"));
user.setTitle(fieldSet.readString("title"));
user.setEmail(fieldSet.readString("email"));
user.setGender(fieldSet.readString("gender"));
//此字段有可能為null
String dataOfBirthStr = fieldSet.readString("date_of_birth");
if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
user.setDateOfBirth(null);
}else{
DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd);
user.setDateOfBirth(dateTime.toJdkDate());
}
user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
user.setSysCreateUser(fieldSet.readString("sys_create_user"));
user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
return user;
}
}
4.5 添加處理元件ItemProcessor
由于
csv
文本檔案中的資料
null
值資料辨別符為
\N
,是以可以在處理元件中進行處理,把辨別符
\N
設定為
null
值。如下所示:
@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {
@Override
public User process(User user) throws Exception {
user.setPhone(checkStr(user.getPhone()));
user.setTitle(checkStr(user.getTitle()));
user.setEmail(checkStr(user.getEmail()));
user.setGender(checkStr(user.getGender()));
log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
return user;
}
public String checkStr(String dataToCheck){
if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
return null;
}
return dataToCheck;
}
}
4.6 添加資料庫寫入元件ItemWriter
資料庫寫入元件使用
JdbcBatchItemWriter
即可,如下:
@Bean
public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
"VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
.dataSource(datasource)
.build();
}
說明:
- 使用
進行JdbcBatchItemWriterBuilder
的建立,設定插入資料庫的sql語句,同時指定資料源即可。JdbcBatchItemWriter
-
用于指定資料源@Qualifier("targetDatasource") DataSource datasource
- 使用
可以直接把讀取的資料實體的屬性資料作為參數填充到BeanPropertyItemSqlParameterSourceProvider
語句中,進而實作資料插入操作。sql
4.7 組裝完整任務
經過上面的操作,可以使用一個java配置,把讀、寫、處理組裝成完整的
step
和
job
,如下所示(詳細可見示例工程檔案):
File2DbBatchConfig.java
@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName)
.listener(file2DbListener)
.flow(file2DbStep)
.end().build();
}
@Bean
public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor
,ItemWriter file2DbWriter){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName)
.<User,User>chunk(10)
.reader(file2DbItemReader)
.processor(file2DbProcessor)
.writer(file2DbWriter)
.build();
}
4.8 編寫測試
參考上一文章的
ConsoleJobTest
,編寫
File2DbJobTest
檔案。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {
@Autowired
private JobLauncherService jobLauncherService;
@Autowired
private Job file2DbJob;
@Test
public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//建構任務運作參數
JobParameters jobParameters = JobUtil.makeJobParameters();
//執行并顯示結果
Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
}
}
執行後結果輸出如下(
exitCode=COMPLETED
):
5.總結
本文先對
Spring Batch
的開箱即用的
ItemReader
,
ItemWriter
、
ItemProcessor
作了一個簡要的概覽,然後以讀
csv
檔案,處理null值,再插入到資料庫的處理邏輯為案例,介紹了
Spring Batch
的資料庫腳本,
FlatFileItemReader
及
JdbcBatchItemWriter
的使用。希望對大家更深入的了解
Spring Batch
有幫助,并能用到實踐中。
參考資源
- 劉相《Spring Batch 批處理架構》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。
- 《Spring Batch - Reference Documentation》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。