1、編寫項目中需要的處理類,縮略代碼為以下步驟:
1.1、編寫執行job類:
jobLauncher執行job是通過查找job的名稱來執行的。
jobparams可自定義業務參數
@Service
public class PaymentCostStatisticsRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job testJob;
public void doStart(Map<String, Object> args) {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("jobid", SnowFlakeIdGenerator.nextId())
.addString("type", args.get("type").toString())
.addString("currentTime", LocalDateTime.now().toString())
.toJobParameters();
jobLauncher.run(testJob, jobParameters);
}
}
1.2、編寫job類:
最初遇到的問題及解決方法:
a、不會接收jobparams中的參數:可以通過監聽器來接收
b、調用job執行reader時,每次調用,都不會執行我reader中寫的判斷方法,隻會執行項目啟動時的分支的方法:加上@StepScope注解,調用接口時,會再次執行reader方法,此時會進行判斷。
@Configuration
@Slf4j
public class Jobs implements StepExecutionListener {
private String type;
/**
* 付款申請統計Job.
*
* @return Job
*/
@Bean
public Job testJob() {
return jobBuilderFactory.get("testJob")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
allpaymentList = new HashSet<>();
return stepBuilderFactory.get("step1")
.listener(this) //此處使用了監聽器,友善擷取之前jobparams中傳入的參數
.<~>chunk(size) //此處size指的是每次讀取并處理的資料條數
.reader(step1Reader())
.writer(step1Writer())
.build();
}
@Bean
public Step step2() {
costComposeMap = Maps.newHashMap();
return stepBuilderFactory.get("step2")
.<~>chunk(size)
.reader(step2Reader()) //這裡隻是給自己寫個案例 和step1一樣 實際的方法就不寫了
.processor(step2Processor())
.writer(step2Writer())
.build();
}
@SneakyThrows
@Bean
@StepScope
public MyBatisCursorItemReader step1Reader() {
MyBatisCursorItemReader reader = new MyBatisCursorItemReader<BorrowExt>();
String queryId;
try {
if ("1".equals(type)) {
queryId = "xx.xxx.xx.selectAll";
log.info("此時執行了全量");
} else {
queryId = "xx.xxx.xx..selectByUpdateTime";
Map<String, LocalDate> params = Maps.newHashMap();
params.put("updateTime", computeDateUtil.computeDate());
reader.setParameterValues(params);
log.info("此時執行了增量");
}
reader.setSqlSessionFactory(sqlSessionFactory); //此處根據業務資料配置一個資料源
reader.setQueryId(queryId);
reader.afterPropertiesSet();
} catch (Exception e) {
log.info(String.valueOf(e));
}
return reader;
}
@Bean
public ItemWriter<PaymentExt> step1Writer() {
return items -> {
log.info("這裡做一些寫入處理");
};
}
//這裡實作了step監聽器的方法
@Override
public void beforeStep(StepExecution stepExecution) {
//擷取到jobParams中的參數
type = stepExecution.getJobExecution().getJobParameters().getString("type");
log.info("監聽type為:{}", type);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}