Spring Batch Job Flow

최준호·2022년 1월 13일
0

Spring Batch

목록 보기
3/10

지금까지 공부한 내용의 코드를 보면 Job 내부에 Step을 구성하고 Step 내부에 실제 비지니스 로직이 구현되어 있다. 그렇다면 Job 내부에서 Step을 어떻게 관리하는지 알아보자.

Next

@Slf4j
@Configuration
@RequiredArgsConstructor
public class StepNextJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job stepNextJob() {
        return jobBuilderFactory.get("stepNextJob")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> This is Step1");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> This is Step2");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> This is Step3");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

StepNextJobConfiguration.java 파일을 job 내에 추가해주자. 코드를 보면 next()는 step들을 순차적으로 연결시킬 때 사용된다. 특별한 작업 없이 순차적으로 step을 진행시키기만 한다면 next()를 사용해서 진행하면 된다.

파라미터에 version=1 값을 넣고 실행해보자.

우리가 방금 작성한 StepNextJobConfiguration은 next()로 정의해둔 순서대로 정상 실행되었지만 그 이후에 이전에 작성했던 SimpleJob 또한 실행되었다.

지정한 Batch Job만 실행되도록 만들기

spring:
  profiles:
    active: live
  batch:
    job:
      names: ${job.name:NONE}

---
spring:
  profiles: local
  datasource:
    hikari:
      jdbc-url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
      username: sa
      password:
      driver-class-name: org.h2.Driver
---
spring:
  profiles: live
  datasource:
    hikari:
      jdbc-url: jdbc:mysql://localhost:3306/BATCH
      username: root
      password: root
      driver-class-name: com.mysql.jdbc.Driver

배치가 실행될 때 프로그램 arguments로 job.name 값이 넘어오면 해당 값의 job만 실행하고 빈 값이라면 NONE이 적용되어 어떤 배치도 실행하지 않겠다는 의미이다.

그러면 실제로 정상 작동하는지 확인해보자.

프로그램 아규먼트를 job.name=stepNextJob 으로 지정하고 새로 실행되기 위해 version=2로 넣어주었다. 이제 실행해보자

전체 로그를 확인했을 때 이전과는 다르게 우리가 실행하고자 했던 job만 실행되었다.

그리고 정말 아규먼트를 정의하지 않으면 실행이 되지 않는지도 체크해보자

batch의 job에 대한 어떤 동작도 일어나지 않았다는 것을 확인할 수 있다.

조건 별 흐름 제어

Next는 순차적으로 Step을 제어하는 방법이다. 그렇다면 Step에서 오류가 나거나 특정 상황에 다른 Step으로 변경해야할 경우 사용할 수 있는 기능이 있을까?

@Slf4j
@Configuration
@RequiredArgsConstructor
public class StepNextConditionalJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job stepNextConditionalJob() {
        return jobBuilderFactory.get("stepNextConditionalJob")
                .start(conditionalJobStep1())
                .on("FAILED") // FAILED 일 경우
                .to(conditionalJobStep3()) // step3으로 이동한다.
                .on("*") // step3의 결과 관계 없이 
                .end() // step3으로 이동하면 Flow가 종료한다.
                .from(conditionalJobStep1()) // step1로부터
                .on("*") // 위에서 FAILED가 한번 걸러졌기 때문에 FAILED 외에 모든 경우
                .to(conditionalJobStep2()) // step2로 이동한다.
                .next(conditionalJobStep3()) // step2가 정상 종료되면 step3으로 이동한다.
                .on("*") // step3의 결과 관계 없이 
                .end() // step3으로 이동하면 Flow가 종료한다.
                .end() // Job 종료
                .build();
    }

    @Bean
    public Step conditionalJobStep1() {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> This is stepNextConditionalJob Step1");

                    /**
                     ExitStatus를 FAILED로 지정한다.
                     해당 status를 보고 flow가 진행된다.
                     **/
                    contribution.setExitStatus(ExitStatus.FAILED);

                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step conditionalJobStep2() {
        return stepBuilderFactory.get("conditionalJobStep2")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> This is stepNextConditionalJob Step2");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step conditionalJobStep3() {
        return stepBuilderFactory.get("conditionalJobStep3")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> This is stepNextConditionalJob Step3");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

step의 상황에 따라 진행할 수 있는 새로운 job StepNextConditionalJobConfiguration 파일을 생성해서 코드를 살펴보자.

on()

  • 캐치할 ExitStatus 지정
  • *일 경우 모든 ExitStatus로 적용

to()

  • 다음으로 이동할 step 지정

from()

  • 이벤트 리스너 역할
  • 상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출

end()

  • end는 FlowBuilder를 반환하는 end와 FlowBuilder를 종료하는 end 2가지가 존재함
  • on("*") 뒤에 있는 end는 FlowBuilder를 반환하는 end
  • build() 앞에 있는 end는 FlowBuilder를 종료하는 end
  • FlowBuilder를 반환하는 end 사용시 계속해서 from()을 이어서 사용할 수 있음

여기서 중요한 점은 on()이 캐치하는 상태값이 BatchStatus가 아닌 ExitStatus라는 점이다.

public Step conditionalJobStep1() {
    return stepBuilderFactory.get("step1")
            .tasklet((contribution, chunkContext) -> {
                log.info(">>>>> This is stepNextConditionalJob Step1");

                /**
                 ExitStatus를 FAILED로 지정한다.
                 해당 status를 보고 flow가 진행된다.
                 **/
                contribution.setExitStatus(ExitStatus.FAILED);

                return RepeatStatus.FINISHED;
            })
            .build();
}

step 1의 코드를 보면

contribution.setExitStatus(ExitStatus.FAILED);

setExitsStatus()를 통해 ExitStatus를 지정할 수 있다.

그렇다면 실제 코드를 실행해보고 정말로 step1 -> step3으로 실행되는지 확인해보자.

정상적으로 step1과 step3이 실행되었다.

Batch Status와 Exit Status
Batch Status Job 또는 Step의 실행 결과를 Spring에서 기록할 때 사용하는 enum
Exit Status Step의 실행 후 상태

Decide

위에서 step의 결과에 따라 다른 step으로 이동하는 방법을 알아봤다. 하지만 딱봐도 코드가 이해하기 어렵고 분기처리가 많아질수록 더 복잡해질 것이 분명하다.

위의 코드의 문제점

  1. Step이 담당하는 역할이 많아진다.
    • 실행하는 서비스 로직 이외에 분기처리를 위해 조건도 확인해야함
    • 많은 분기처리 로직을 작성하기 어려움

그래서 사용하는 것을 JobExecutionDecider라고 한다. 코드로 확인해보자.

@Slf4j
@Configuration
@RequiredArgsConstructor
public class DeciderJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job deciderJob() {
        return jobBuilderFactory.get("deciderJob")
                .start(startStep())
                .next(decider()) // 홀수 | 짝수 구분
                .from(decider()) // decider의 상태가
                .on("ODD") // ODD라면
                .to(oddStep()) // oddStep로 간다.
                .from(decider()) // decider의 상태가
                .on("EVEN") // ODD라면
                .to(evenStep()) // evenStep로 간다.
                .end() // builder 종료
                .build();
    }

    @Bean
    public Step startStep() {
        return stepBuilderFactory.get("startStep")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> Start!");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step evenStep() {
        return stepBuilderFactory.get("evenStep")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> 짝수입니다.");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step oddStep() {
        return stepBuilderFactory.get("oddStep")
                .tasklet((contribution, chunkContext) -> {
                    log.info(">>>>> 홀수입니다.");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public JobExecutionDecider decider() {
        return new OddDecider();
    }

    public static class OddDecider implements JobExecutionDecider {

        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
            Random rand = new Random();

            int randomNumber = rand.nextInt(50) + 1;
            log.info("랜덤숫자: {}", randomNumber);

            if(randomNumber % 2 == 0) {
                return new FlowExecutionStatus("EVEN");
            } else {
                return new FlowExecutionStatus("ODD");
            }
        }
    }
}

Decide를 실습할 DeciderJobConfiguration 파일을 생성했다 코드를 보면 분기처리에 대한 코드는 모두 OddDecider가 전담하고 있다. 모든 분기처리에 대한 코드는 OddDecider에서 처리한 뒤 상태값만 반환하면 되므로 step에서는 상태값에 대한 처리만 하고 다음 step을 어디로 갈지만 정해놓으면 된다.

정상적으로 실행된 것을 확인할 수 있다.

출처
job flow에 대한 참고글

0개의 댓글