說明
我這裡使用的spring boot,同理非spring boot項目可以參考spring boot自動化配置類:org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration 進行手動java bean config方式進行手動配置
需要知道以下知識點
《Spring Boot-Starter(九)》
《spring源碼閱讀(五)-Spring Import注解使用》
準備工作
1.導入pom依賴
<!-- spring batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!--spring-boot-starter-jdbc自動配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--資料庫驅動 我本地資料庫是8.0是以使用8.0-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
<!--spring mvc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.建立jobRepostiry資料庫表
在依賴的batch core包裡找到sql腳本 裡面有各個存儲中繼資料的資料源 和配置方式 我們用mysql就行
每個表作用的相關介紹可以參考:javascript:void(0)
3.application.properties配置
# jdbc_config datasource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring_batch_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=868978
# Hikari will use the above plus the following to setup connection pooling
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.max-lifetime=1800000
spring.datasource.hikari.connection-timeout=30000
#關閉自啟動job 源碼處org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration
spring.batch.job.enabled=false
4.使用EnableBatchProcessing完成部配置設定置
@SpringBootApplication
//可選參數預先初始化還是延遲初始化
@EnableBatchProcessing(modular = true)
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
初始化job
定義ItemReader
這裡通過讀取資料來模拟讀取
/**
* @Project spring-batch-test-demo
* @PackageName springbatchsimpledemo.demo.step.reader
* @ClassName DemoReader
* @Author qiang.li
* @Date 2021/10/21 2:54 下午
* @Description 模拟讀資料
*/
public class DemoReader implements ItemReader<String> {
//模拟資料
private static List<String> datas=new ArrayList<>();
private int cursor=0;
static {
for (int j=0;j<=100;j++){
datas.add(j+"");
}
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//傳回null表示讀完
if(cursor>=datas.size()){
return null;
}
return datas.get(cursor++);
}
}
定義ItemProcessor
/**
* @Project spring-batch-test-demo
* @PackageName springbatchsimpledemo.demo.step.reader
* @ClassName DemoProcessor
* @Author qiang.li
* @Date 2021/10/21 3:08 下午
* @Description 模拟 資料加工處理
*/
public class DemoProcessor implements ItemProcessor<String,Long> {
@Override
public Long process(String item) throws Exception {
return Long.valueOf(item);
}
}
定義ItemWriter
/**
* @Project spring-batch-test-demo
* @PackageName springbatchsimpledemo.demo.step
* @ClassName DemoWriter
* @Author qiang.li
* @Date 2021/10/21 3:10 下午
* @Description TODO
*/
public class DemoWriter implements ItemWriter<Long> {
@Override
public void write(List<? extends Long> items) throws Exception {
for (Long id:
items) {
System.out.println("模拟寫入"+id);
}
}
}
定義Step和job
@Configuration
public class DemoJobConfig{
@Autowired
private ModularBatchConfiguration modularBatchConfiguration;
/**
* 定義job
* @return
* @throws Exception
*/
@Bean
public Job initJob() throws Exception {
Job job= modularBatchConfiguration.jobBuilders()
.get("demoJob")
.start(initStep())//單線程
.build();
//注冊到registry 後續通過registry擷取job
modularBatchConfiguration.jobRegistry().register(new JobFactory() {
@Override
public Job createJob() {
return job;
}
@Override
public String getJobName() {
return job.getName();
}
});
return job;
}
/**
* 定義step
* @return
* @throws Exception
*/
public Step initStep() throws Exception {
return modularBatchConfiguration.stepBuilders()
.get("demoStep")
.<String, Long>chunk(1)
.reader(new DemoReader())
.processor((new DemoProcessor()))
.writer(new DemoWriter())
.build();
}
}
測試
/**
* @Project spring-batch-test-demo
* @PackageName springbatchsimpledemo.demo.controller
* @ClassName DemoSyncJob
* @Author qiang.li
* @Date 2021/10/21 1:11 下午
* @Description TODO
*/
@Controller
public class DemoJobController {
@Autowired
private ModularBatchConfiguration modularBatchConfiguration;
/**
* 啟動任務,如果任務失敗,再次調用則是重新執行
* BatchAutoConfiguration 初始化
*/
@Autowired
private JobOperator jobOperator;
@RequestMapping("/startDemoJob")
@ResponseBody
public String startOrderJob() throws Exception {
Map<String, JobParameter> parameters=new HashMap<>();
parameters.put("date",new JobParameter(11L));
JobParameters jobParameters= new JobParameters(parameters);
Job job= modularBatchConfiguration.jobRegistry().getJob("demoJob");
modularBatchConfiguration.jobLauncher().run(job,jobParameters);
// jobOperator.start("demoJob",jobParameters);
return "success";
}
@RequestMapping("/stopDemoJob")
@ResponseBody
public String stopJob() throws Exception {
Set<Long> ids= jobOperator.getRunningExecutions("demoJob");
for (Long id:
ids) {
jobOperator.stop(id);
}
return "success";
}
/**
* 無視錯誤 啟動一個新的job
* @return
* @throws Exception
*/
@RequestMapping("/startNextDemoOrderJob")
@ResponseBody
public String startNextDemoOrderJob() throws Exception {
jobOperator.startNextInstance("demoJob");
return "success";
}
}
輸出結果
預設實作
關于常用的Reader和Writer spring batch提供很多基礎的實作具體我們檢視對應實作類就好了