天天看點

springBatch使用過程中踩的坑

1、編寫項目中需要的處理類,縮略代碼為以下步驟:

1.1、編寫執行job類:

jobLauncher執行job是通過查找job的名稱來執行的。

jobparams可自定義業務參數

@Service
public class PaymentCostStatisticsRunner {
  
  @Autowired
  private JobLauncher jobLauncher;
  
  @Autowired
  private Job testJob;
 
  public void doStart(Map<String, Object> args) {
    JobParameters jobParameters = new JobParametersBuilder()
        .addLong("jobid", SnowFlakeIdGenerator.nextId())
        .addString("type", args.get("type").toString())
        .addString("currentTime", LocalDateTime.now().toString())
        .toJobParameters();
    jobLauncher.run(testJob, jobParameters); 
  }
}
           

1.2、編寫job類:

最初遇到的問題及解決方法:

a、不會接收jobparams中的參數:可以通過監聽器來接收

b、調用job執行reader時,每次調用,都不會執行我reader中寫的判斷方法,隻會執行項目啟動時的分支的方法:加上@StepScope注解,調用接口時,會再次執行reader方法,此時會進行判斷。

@Configuration
@Slf4j
public class Jobs implements StepExecutionListener {

    private String type;


    /**
     * 付款申請統計Job.
     *
     * @return Job
     */
    @Bean
    public Job testJob() {
        return jobBuilderFactory.get("testJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1() {
        allpaymentList = new HashSet<>();
        return stepBuilderFactory.get("step1")
                .listener(this) //此處使用了監聽器,友善擷取之前jobparams中傳入的參數
                .<~>chunk(size) //此處size指的是每次讀取并處理的資料條數
                .reader(step1Reader())
                .writer(step1Writer())
                .build();
    }

    @Bean
    public Step step2() {
        costComposeMap = Maps.newHashMap();
        return stepBuilderFactory.get("step2")
                .<~>chunk(size)
                .reader(step2Reader()) //這裡隻是給自己寫個案例 和step1一樣 實際的方法就不寫了
                .processor(step2Processor())
                .writer(step2Writer())
                .build();
    }

    @SneakyThrows
    @Bean
    @StepScope
    public MyBatisCursorItemReader step1Reader() {
        MyBatisCursorItemReader reader = new MyBatisCursorItemReader<BorrowExt>();
        String queryId;
        try {
            if ("1".equals(type)) {
                queryId = "xx.xxx.xx.selectAll";
                log.info("此時執行了全量");
            } else {
                queryId = "xx.xxx.xx..selectByUpdateTime";
                Map<String, LocalDate> params = Maps.newHashMap();
                params.put("updateTime", computeDateUtil.computeDate());
                reader.setParameterValues(params);
                log.info("此時執行了增量");
            }
            reader.setSqlSessionFactory(sqlSessionFactory); //此處根據業務資料配置一個資料源
            reader.setQueryId(queryId);
            reader.afterPropertiesSet();
        } catch (Exception e) {
            log.info(String.valueOf(e));
        }
        return reader;
    }

    @Bean
    public ItemWriter<PaymentExt> step1Writer() {
        return items -> {
            log.info("這裡做一些寫入處理");
        };
    }

	//這裡實作了step監聽器的方法
    @Override
    public void beforeStep(StepExecution stepExecution) {
    	//擷取到jobParams中的參數
        type = stepExecution.getJobExecution().getJobParameters().getString("type");
        log.info("監聽type為:{}", type);
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}