스프링 배치

김종하·2023년 7월 20일
1

스프링 배치

스프링 배치 메타 데이터

스프링 배치의 실행 및 관리 목적의 도메인(Job, Step..) 정보들을 저장, 업데이트, 조회할 수 있는 스키마를 제공
DB 와 연동할 경우 필수적으로 메타 테이블이 생성되어있어야 한다.

스키마 파일 위치 : /org/springframework/batch/core/schema-*.sql

  • 스키마 생성 방법 및 설정
  1. 수동 생성 - 스키마 파일에서 쿼리 복사 후 직접 실행
  2. 자동 생성 spring.batch.jdbc.initialize-schema 설정
    (always : 스크립트 항상 실행, embedded: 내장 DB 일때만 실행, never: 스크립트 항상 실행 안함)
    • 운영환경에서는 수동생성할 것
  • 스프링 배치 스키마
    1. BATCH_JOB_INSTANCE
      job 이 실행될 때 jobInstance 정보가 저장
      job_name 과 job_key 를 합쳐 unique
    2. BATCH_JOB_EXECUTION
      job 의 실행정보 저장
      job 생성, 시작, 종료시간, 실행상태 메시지 등을 관리
    3. BATCH_JOB_EXECUTION_PARAMS
      job과 함께 실행되는 jobParameter 정보 저장
    4. BATCH_JOB_EXECUTION_CONTEXT
      ****job 의 실행동안 여러가지 상태정보, 공유 데이터를 직렬화(json) 해서 저장
      step 간 서로 공유 가능
    5. BATCH_STEP_EXECUTION
      step 의 실행정보 저장
      step 생성, 시작, 종료시간, 실행상태 메시지 등을 관리
    6. BATCH_STEP_EXECUTION_CONTEXT
      step 의 실행동안 여러가지 상태정보, 공유 데이터를 직렬화 해서 저장.
      step 별로 저장되며 step 간 서로 공유할 수 없음

스프링 배치 도메인

  • JobInstance Job 이 실행될 때 생성되는 Job 의 논리적 실행 단위 객체로 고유하게 식별 가능한 작업 실행을 나타낸다. Job 의 설정과 구성은 동일하지만 Job 이 실행되는 시점에 처리하는 내용이 다르기 때문에 Job 의 실행을 구분한다. (하루에 3번씩 Job 이 실행되면 3개의 JobInstance 가 생성)
  • JobParameter Job 을 실행할 때 함께 포함되어 사용되는 파라미터를 가진 도메인 객체 하나의 Job 에 존재할 수 있는 여러개의 JobInstance 를 구분하기 위한 용도 생성 및 바인딩 방법
    1. 어플리케이션 실행 시 주입
      java -jar LogBatch.jar requestDate=20210101

    2. 코드로 생성
      JobParameterBuilder, DefaultJobParameterConverter

    3. SpEL 이용
      @Value(”#{jobParameter[requestDate]}”), @JobScope, @StepScope 선언 필수

      JobParameter 를 코드에서 참조방법

      @Bean
          public Step step1() {
              return stepBuilderFactory.get("step1")
                      .tasklet((contribution, chunkContext) -> {
      										// contribution 에서 참조
                          JobParameters jobParameters = contribution.getStepExecution().getJobParameters();
      										// chunkContext 에서 참조해서 파라미터 값 확인
                          Map<String, Object> jobParameters1 = chunkContext.getStepContext().getJobParameters();
      
                          return RepeatStatus.FINISHED;
                      })
                      .build();
          }
  • JobExecution JobInstance 에 대한 한번의 시도를 의미하는 객체
    Job 실행 중에 발생한 정보들을 저장하고 있는 객체 JobExecutionFAILED 또는 COMPLETED 등의 Job 실행 결과 상태를 가지고 있음
    상태가 COMPLETED 면 재실행이 불가함
    상태가 FAILED 면 재실행이 가능함 JobExecution 의 실행 결과 상태가 COMPLETED 가 될 때 까지 JobInstance 내에서 여러 번의 시도가 생길 수 있다.
  • Step Batch Job 을 구성하는 독립적인 하나의 단계로 실제 배치 처리를 정의하고 컨트롤하는데 필요한 정보를 가진 객체 배치작업을 어떻게 구성하고 실행할 것인지 Job 세부 작업을 Task 기반으로 설정하고 명세해 놓은 객체
    • TaskletStep : 가장 기본이 되는 클래스로서 Tasklet 타입의 구현체들을 제어한다.
    • PartionStep: 멀티 쓰레드 방식으로 Step 을 여러 개로 분리해서 실행한다.
    • JobStep: Step 내에서 Job 을 실행하도록 한다.
    • FlowStep: Step 내에서 Flow 를 실행하도록 한다.
  • StepExecution Step 에 대한 한번의 시도를 의미하는 객체
    Step 실행 중에 발생한 정보들을 저장하고 있다.
    Step 이 매번 시도될 때 마다 생성되며 각 Step 별로 생성된다.
    Job 이 재시작 하더라도 이미 성공적으로 완료된 Step 은 재 실행되지 않고 실패한 Step 만 실행된다.
  • StepContribution 청크 프로세스 변경 사항을 버퍼링 한 후 StepExecution 상태를 업데이트하는 도메인 객체
    청크 커밋 직전에 StepExecution 의 apply() 메소드를 호출하여 상태를 업데이트
    ExitStatus 의 기본 종료코드 외 사용자 정의 종료코드를 생성하여 적용할 수 있다.
  • ExecutionContext 프레임워크에서 유지 및 관리하는 키/값으로 된 컬렉션으로 StepExecution 또는 JobExecution 객체의 상태를 저장하는 공유 객체 DB 에 직렬화 한 값으로 저장됨 공유 범위
    • Step 범위 : 각 Step 의 StepExecution 에 저장되며 Step 간 서로 공유 안됨

    • Job 범위 : 각 Job 의 JobExecution 에 저장되며 Job 간 서로 공유 안되며 해당 Job 의 Step 간 서로 공유됨

      실패한 Job 재 시작 시 이미 처리한 Row 데이터는 건너뛰고 이후부터 수행할 수 있도록 할 때 ExecutionContext 를 활용할 수 있다.

  • JobRepository 배치 작업 중의 정보를 저장하는 저장소 역할
    배치 작업의 수행과 관련된 모든 meta data 를 저장 @EnableBatchProcessing 애노테이션 선언시 JobRepository 가 자동으로 빈으로 등록 BatchConfigurer 인터페이스를 구현하거나 BasicBatchConfgiurer 를 상속하여 JobRepository 설정 커스터마이징 가능
  • JobLauncher 배치 Job 을 실행시키는 역할을 한다.
    Job 과 JobParameters 를 인자로 받아 요청된 배치 작업을 수행한 후 최종 client 에게 JobExecution 을 반환 스프링 부트 배치가 구동되면 JobLauncher 빈이 자동 생성된다. Job실행
    • JobLauncher.run(Job, Jobparameters)
    • 스프링 부트 배치에서는 JobLauncherApplicationRunner 가 자동으로 JobLauncher 실행
    • 동기적 실행
      • taskExecutor 를 SyncTaskExecutor 로 설정 (기본값)
      • JobExecution 을 획득하고 배치 처리를 최종 완료한 이후 Client 에게 JobExecution 반환
      • 스케쥴러에 의한 배치처리에 적합
    • 비동기적 실행
      • taskExecutor 를 SimpleAsyncTaskExecutor 로 설정
      • JobExecution 을 획득한 후 Client 에게 바로 JobExecution 을 반환하고 배치처리를 완료한다.
      • HTTP 요청에 의한 배치처리에 적합

스프링 부트 배치 설정

  1. JobLauncherApplicationRunner
  • BatchAutoConfiguration 에서 생성
  • 애플리케이션 정상적으로 구동될 때 마다 실행됨
  • 기본적으로 빈으로 등록된 모든 job 을 실행시킴
    spring:
    	batch:
    	  job:
    	    enabled: true 
    # (기본값 true 사용하고 싶지 않은경우 false 설정할 것) 
  1. BatchProperties
  • Spring Batch 환경 설정 클래스
  • Job 이름, 스키마 초기화 설정, 테이블 prefix 등의 값 설정 가능
  • application.yml 파일에서 설정
  1. Job 실행 옵션
  • 지정한 Batch Job 만 실행하도록 할 수 있다.
    spring:
    	batch:
    		job:
    			names: ${job.name:NONE}
    • 애플리케이션 실행 시 programArguments 로 job 이름 입력
      --job.name=helloJob,simpleJob

스프링 배치가 제공하는 빌더 클래스

  • JobBuilderFactory JobBuilder 를 생성하는 팩토릴 클래스로 get(String name) 메소드 제공
    jobBuilderFactory.get("jobName") // jobName 은 스프링 배치가 job 을 실행할 때 참조하는 job 이름
  • JobBuilder Job 을 구성하는 설정 조건에 따라 하위 빌더 클래스를 생성하고 Job 생성을 위임한다.
    1. SimpleJobBuilder
    2. FlowJobBuilder
  • StepBuilderFactory StepBuilder 를 생성하는 팩토리 클래스
    get(String name) 메서드
  • StepBuilder Step 을 구성하는 설정조건에 따라 5개의 하위 빌더 클래스에 Step 생성을 위임한다.
    1. TaskletStepBuilder
    2. SimpleStepBuilder
    3. PartionStepBuilder
    4. JobStepBuilder
    5. FlowStepBuilder

SimpleJob

  • simpleJob 은 Step 을 실행시키는 Job 구현체로 SimpleJobBuilder 에 의해 생성=
  • 여러 단계의 Step 으로 구성할 수 있으며 Step 을 순차적으로 실행시킨다.
    • 실패한 Step 이 생기면 그 이후 Step 은 실행하지 않는다.
  • 모든 Step 의 실행이 성공적으로 완료되어야 Job 이 성공적으로 완료된다.
  • 맨 마지막에 실행된 Step 의 BatchStatus 가 Job 의 최종 BatchStatus

TaskletStep

  • 스프링 배치에서 제공하는 Step 구현체, Tasklet 을 실행시키는 도메인 객체
  • RepeatTemplate 를 사용해서 Tasklet 의 구문을 트랜잭션 경계 내에서 반복 실행
  • Task 기반과 Chunk 기반으로 나눠 Tasklet 실행

Task | Chunk 비교

  • chunk 기반
    • 하나의 큰 덩어리를 n 개씩 나눠(chunk 단위) 실행한다는 의미로 대량 처리를 위해 설계됨
    • ItemReader, ItemProcessor, ItemWriter 를 사용하며 청크 기반 전용 Tasklet 인 ChunkOrientedTasklet 구현체 제공
  • Task 기반
    • 청크 기반 작업 보다 단일 작업 기반으로 처리되는 것이 더 효율적인 경우
    • 주로 Tasklet 구현체를 만들어 사용
    • 대량 처리를 하는 경우 chunk 기반에 비해 더 복잡한 구현 필요

JobStep

  • Job 에 속하는 Step 중 외부의 Job 을 포함하고 있는 Step
  • 외부의 Job 이 실패하면 해당 Step 이 실패하므로 결국 최종 기본 Job 도 실패
  • 모든 메타데이터는 기본 Job 과 외부 Job 별로 각각 저장
  • 커다른 시스템을 작은 모듈로 쪼개고 job 의 흐름을 관리하고자 할 때 사용

FlowJob

  • Step 을 순차적으로만 구성하는 것이 아닌 특정한 상태에 따라 흐름을 전환하도록 구성할 수 있다.
    • Step 이 실패 하더라도 Job 이 실패로 끝나지 않도록 해야 하는 경우
    • Step 이 성공 했을 때 다음에 실행해야 할 Step 을 구분해서 실행해야 하는 경우
    • 특정 Step 은 전혀 실행되지 않게 구성 해야 하는 경우
  • Flow 와 Job 의 흐름을 구성하는데만 관여하고 실제 비즈니스 로직은 Step 에서 이루어진다.
  • 내부적으로 SimpleFlow 객체를 포함하고 있으며 Job 실행 시 호출한다.

트랜지션에 활용하는 배치 상태 유형

  • BatchStatus
    • JobExecution 과 StepExecution 의 속성으로 Job / Step 종료 후 최종 결과 상태 정의
    • SimpleJob ⇒ 마지막 Step 의 BatchStatus 값을 Job 의 최종 BatchStatus 값으로 반영
    • FlowJob ⇒ Flow 내 Step 의 ExitStatus 값을 FlowExecutionStatus 값으로 저장
    • COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN
  • ExitStatus
    • JobExecution 과 StepExecution 의 속성으로 Job / Step 실행 후 어떤 상태로 종료됐는지 정의
    • 기본적으로 ExitStatus 는 BatchStatus 와 동일한 값으로 설정된다.
    • SimpleJob ⇒ 마지막 Step 의 ExitStatus 값을 Job 의 최종 ExitStatus 값으로 반영
    • FlowJob ⇒ Flow 내 Step 의 ExitStatus 값을 FlowExecution 값으로 저장
    • UNKNOWN, EXECUTING, COMPLETED, NOOP, FAILED, STOPPED
  • FlowExecutionStatus
    • FlowExecution 의 속성으로 Flow 의 실행 후 최종 결과 상태가 무엇인지 정의
    • Flow 내 Step 이 실행되고 나서 ExitStatus 값을 FlowExecutionStatus 값으로 저장
    • FlowJob 의 배치 결과 상태에 관여
    • COMPLETED, STOPPED, FAILED, UNKNOWN

사용자 정의 ExitStatus

  • StepExecutionListener 의 afterStep() 메서드에서 Custom exitCode 생성 후 새로운 ExitStatus 반환
  • Step 실행 후 완료 시점에서 현재 exitCode 를 사용자 정의 exitCode로 수정할 수 있음
    public class PassCheckingLister implements StepExecutionListener {
        @Override
        public void beforeStep(StepExecution stepExecution) {
    
        }
    
        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            String exitCode = stepExecution.getExitStatus().getExitCode();
            if(!exitCode.equals(ExitStatus.FAILED.getExitCode())) {
                return new ExitStatus("PASS");
            }
            return null;
        }
    }

JobExecutionDecider


@Bean
    public Job batchJob() {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(decider())
                .from(decider()).on("ODD").to(oddStep())
                .from(decider()).on("EVEN").to(eventStep())
                .end()
                .build();
    }

@Bean
    public JobExecutionDecider decider() {
        return new CustomDecider();
    }

==== 

public class CustomDecider implements JobExecutionDecider {

    private int count = 0;
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        count++;
        if(count % 2 == 0) {
            return new FlowExecutionStatus("EVEN");
        }
        return new FlowExecutionStatus("ODD");
    }
}

@JobScope, @StepScope

  • Job 과 Step 의 빈 생성과 실행에 관여하는 스코프
  • code 예시
    // jobConfig 
    @RequiredArgsConstructor
    @Configuration
    public class JobConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private static final String ANY_STATUS = "*";
    
        @Bean
        public Job batchJob() {
            return jobBuilderFactory.get("job")
                    .incrementer(new RunIdIncrementer())
                    .start(step1(null))
                    .next(step2())
                    .listener(new JobListener())
                    .build();
        }
    
        @Bean
        @JobScope
        public Step step1(@Value("#{jobParameters['message']}") String message) {
            return stepBuilderFactory.get("step1")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println("message --> " + message);
                        System.out.println(contribution.getStepExecution().getStepName() + " has executed");
                        return RepeatStatus.FINISHED;
                    }).build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet(tasklet(null, null, null))
                    .listener(new CustomStepExecutionListener())
                    .build();
        }
    
        @Bean
        @StepScope
        public Tasklet tasklet(@Value("#{jobExecutionContext['name']}") String name,
                               @Value("#{stepExecutionContext['name2']}") String name2,
                               @Value("#{jobParameters['message']}") String message) {
            return (stepContribution, chunkContext) -> {
                System.out.println("name --> " + name);
                System.out.println("name2 --> " + name2);
                System.out.println("message --> " + message);
                System.out.println("tasklet has executed");
                return RepeatStatus.FINISHED;
            };
        }
    }
    
    // JobListener
    public class JobListener implements JobExecutionListener {
        @Override
        public void beforeJob(JobExecution jobExecution) {
            jobExecution.getExecutionContext().put("name", "user1");
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
    
        }
    }
    
    // StepListener
    public class CustomStepExecutionListener implements StepExecutionListener {
        @Override
        public void beforeStep(StepExecution stepExecution) {
            stepExecution.getExecutionContext().put("name2", "user2");
        }
    
        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            return null;
        }
    }

Chunck process

  • chunk 란 여러 개의 아이템을 묶은 하나의 덩어리를 의미한다
  • 한번에 하나씩 아이템을 입력받아 chunk 단위로 트랜잭션을 처리한다.
  • 일반적으로 대용량 데이터를 한번에 처리하는 것이 아닌 청크 단위로 쪼개서 더 이상 처리할 데이터가 없을 때 까지 반복처리

ChunckProvider

  • ItemReader 를 사용해서 소스로부터 아이템을 chunk size 만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체
  • 반복문 종료 시점
    • Chunk size 만큼 item 을 읽으면 반복문 종료하고 ChunkProcess 로 넘어간다
    • ItemReader 가 읽은 item 이 null 일 경우 반복문 종료 및 해당 Step 반복문까지 종료

ChunkProcessor

  • ItemProcessor 를 사용해 Item 을 변형, 가공, 필터링 하고 ItemWriter 를 사용해 Chunk 데이터를 저장,출력
  • Chunk 를 만들고 앞에서 넘어온 Chunk 의 item 을 한 건씩 처리 후 Chunck 에 저장
  • ItemProcessor 는 선택사항으로 만약 없는 경우 ItemReader 에서 읽은 item 그대로 Chunk 에 저장
  • ItemProcessor 처리가 완료되면 Chunck 에 있는 List 을 ItemWriter 에게 전달
  • ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료되고 Step 반복문에서 ChunkOrientedTasklet 이 새롭게 생성된다

ItemReader

  • 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
    • 플랫(flat) 파일 - csv, txt ( 고정 위치로 정의된 데이터 필드나 특수만자로 구별된 데이터의 행)
    • XML, Json
    • Database
    • JMS, RabbitMQ 같은 message Queuing 서비스
    • CustomReader - 구현 시 멀티 스레드 환경에서 스레드에 안전하게 구현할 필요가 있음

ItemWriter

  • Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스
    • 플랫(flat) 파일 - csv, txt ( 고정 위치로 정의된 데이터 필드나 특수만자로 구별된 데이터의 행)
    • XML, Json
    • Database
    • JMS, RabbitMQ 같은 message Queuing 서비스
    • CustomWriter - 구현 시 멀티 스레드 환경에서 스레드에 안전하게 구현할 필요가 있음
  • 아이템 하나가 아닌 아이템 리스트를 전달 받는다.

ItemProcessor

  • 데이터를 출력하기 전 데이터를 가공, 변형, 필터링하는 역할

ItemStream

  • ItemReader 와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재 시작 하도록 지원
  • ExecutionContext 를 매개변수로 받아 상태 정보를 업데이트 한다.

1개의 댓글

comment-user-thumbnail
2023년 7월 20일

뛰어난 글이네요, 감사합니다.

답글 달기