[Spring Batch] Step 분기처리

코딩은 많은 시행착오·2023년 10월 29일
0

back-end

목록 보기
5/18

Spring Batch로 배치 로직을 개발하던 중 분기처리가 필요한 상황에 직면했다.

Step에서 어떤 작업의 결과에 따라 다음 로직을 수행하려면 Spring Batch에서 제공하는 JobExcutionDecider를 사용하면 된다.


분기문의 결과에 따라 다른 Step을 실행할 수 있어서 배치 코드의 흐름제어에 도움이 된다.

예시 코드

먼저 DB를 조회해서 list가 null이 아니면 그 list를 이용해서 작업하고, list가 null 일 경우 log만 남기고 끝내는 요구사항이 있다고 생각해보자.


코드 전문

@Slf4j
@Configuration
@RequiredArgsConstructor
public class SimpleJob {
	private List<String> list = new ArrayList<>();
  private final int chunkSize = 500;
  private final PlatformTransactionManager transactionManager;
  private final JobRepository jobRepository;
  private final EntityManagerFactory entityManagerFactory;
  private final TestRepository testRepository;

    @Bean
    public Job TestJob() {
          return new JobBuilder("testJob", jobRepository)
                          .incrementer(new RunIdIncrementer())
                          .start(readListStep())
                          .next(decider())
                          .from(decider()).on("PROCESS")
                          .to(processStep())
                          .from(decider()).on("TERMINATE")
                          .to(terminateStep())
                          .end()
                          .build();
    }
   
    @Bean
    public JobExecutionDecider decider() {
        return (JobExecution jobExecution, StepExecution stepExecution) ->
              !list.isEmpty() ? new FlowExecutionStatus("PROCESS") : new FlowExecutionStatus("TERMINATE");
    }
    
    @Bean
    @JobScope
    public Step readListStep() {
        return new StepBuilder("readListStep", jobRepository)
                .tasklet(readListTasklet(), transactionManager)
                .build();
    }

    @Bean
    @JobScope
    public Tasklet readListTasklet() {
        return (contribution, chunkContext) -> {
            list = testRepository.findAll();
            return RepeatStatus.FINISHED;
        };
    }
    
    @Bean
    @JobScope
    public Step processStep() {
        return new StepBuilder("processStep", jobRepository).<String, String>chunk(chunkSize, transactionManager)
                .reader(testReader())
                .writer(testWriter())
                .build();
    }
    
    @Bean
    @StepScope
    public JpaPagingItemReader<String> testReader() {
        return new JpaPagingItemReaderBuilder<String>()
                .name("testReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT t FROM Test t WHERE t.isUse = true")
                .build();
    }

    @Bean
    @StepScope
    public ItemWriter<String> testWriter() {
        return testList -> {
            // list와 testList로 작업 처리 로직
            testRepository.saveAll(testList);
        };
    }

    @Bean
    @JobScope
    public Step terminateStep() {
        return new StepBuilder("terminateStep", jobRepository)
                .tasklet(terminateTasklet(), transactionManager)
                .build();
    }

    @Bean
    @JobScope
    public Tasklet terminateTasklet() {
        return (contribution, chunkContext) -> {
            log.error("List Read Error : List is null");
            return RepeatStatus.FINISHED;
        };
    }
}

이제 코드를 하나씩 살펴보자.

Job 코드

	  @Bean
      public Job TestJob() {
            return new JobBuilder("testJob", jobRepository)
                            .incrementer(new RunIdIncrementer())
                            .start(readListStep())
                            .next(decider())
                            .from(decider()).on("PROCESS")
                            .to(processStep())
                            .from(decider()).on("TERMINATE")
                            .to(terminateStep())
                            .end()
                            .build();
      }

Job에서 보면 먼저 readListStep을 실행하고, decider로 분기처리를 진행한다.
decider의 결과에 따라 "PROCESS"면 processStep, "TERMINATE"면 terminateTasklet을 실행해준다.



decider 코드

 	  @Bean
      public JobExecutionDecider decider() {
          return (JobExecution jobExecution, StepExecution stepExecution) ->
                !list.isEmpty() ? new FlowExecutionStatus("PROCESS") : new FlowExecutionStatus("TERMINATE");
      }

decider의 코드를 들여다보면, 람다 구문으로 list.isEmpty가 true일 경우 TERMINATE를, false일 경우 PROCESS를 return하게 된다.

이렇게 return을 하게되면 Job코드에서 상황에 맞는 to 항목으로 가서 작업을 진행한다.
만약 .to() 뒤에도 다른 Step을 추가하고 싶다면 .next()를 사용하면 된다.



processStep

	  @Bean
      @JobScope
      public Step processStep() {
          return new StepBuilder("processStep", jobRepository).<String, String>chunk(chunkSize, transactionManager)
                  .reader(testReader())
                  .writer(testWriter())
                  .build();
      }
      
      @Bean
      @StepScope
      public JpaPagingItemReader<String> testReader() {
          return new JpaPagingItemReaderBuilder<String>()
                  .name("testReader")
                  .entityManagerFactory(entityManagerFactory)
                  .pageSize(chunkSize)
                  .queryString("SELECT t FROM Test t WHERE t.isUse = true")
                  .build();
      }

      @Bean
      @StepScope
      public ItemWriter<String> testWriter() {
          return testList -> {
              // list와 testList로 작업 처리 로직
              testRepository.saveAll(testList);
          };
      }

작업을 처리하는 Step이다.
Jpa로 별도의 table을 읽고, writer에서 list와 chunk단위로 읽은 testList로 작업을 처리해준다.
작업은 요구사항과 비즈니스 로직에 맞게 구현하면 된다.



terminateStep

	  @Bean
      @JobScope
      public Step terminateStep() {
          return new StepBuilder("terminateStep", jobRepository)
                  .tasklet(terminateTasklet(), transactionManager)
                  .build();
      }
      
	  @Bean
      @JobScope
      public Tasklet terminateTasklet() {
          return (contribution, chunkContext) -> {
              log.error("List Read Error : List is null");
              return RepeatStatus.FINISHED;
          };
      }

별도의 로그를 남기고 종료하는 step이다.



이로서 Spring Batch에 decider에 대해 알아봤다.
꼭 이런 예시같은 로직이 아니더라도 상황에 맞게 로직을 수행하는 데 도움이 될 것이다.

0개의 댓글