스프링 배치

Seung jun Cha·2023년 4월 26일
0
post-thumbnail

1. 개념

  1. 배치 프로세싱은 일괄처리라는 뜻을 가지고 있으며, 일괄처리의 의미는 일련의 작업을 정해진 로직으로 수행하는 것입니다.

  2. 스프링 배치는 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기, 리소스 관리 등 대용량 레코드 처리에 필수적인 기능을 제공합니다.

  3. 최적화 및 파티셔닝 기술을 통해 대용량 및 고성능 배치 작업을 가능하게 하는 고급 기술 서비스 및 기능을 제공합니다.

  4. Spring Batch에서 배치가 실패하여 작업 재시작을 하게 된다면 처음부터가 아닌 실패한 지점부터 실행을 합니다.

  5. 중복 실행을 막기 위해 성공한 이력이 있는 Batch는 동일한 Parameters로 실행 시 Exception이 발생합니다.

  6. 스프링 배치에서 JPA를 사용하는 경우, 배치 작업에서 DB와의 데이터 처리를 위해 EntityManager를 사용합니다. 따라서 EntityManagerFactory를 주입해주어야 합니다.

  7. 스프링 배치를 구성하는 것과 배치를 실행하는 것은 별개이다.

  • 스프링 배치가 사용되는 경우
  1. 대용량의 비지니스 데이터를 복잡한 작업으로 처리해야하는 경우
  2. 특정한 시점에 스케줄러를 통해 자동화된 작업이 필요한 경우
  3. 대용량 데이터의 포맷을 변경, 유효성 검사 등의 작업을 트랜잭션 안에서 처리 후 기록해야하는 경우

2. 구성 및 개념

  • Job
    배치처리 과정을 하나의 단위로 만들어놓은 배치처리 과정에 있어 전체 계층 최상단에 위치한다.
    배치 프로세스의 실행단위로 하나의 Job객체는 여러 Step 인스턴스를 포함하는 컨테이너이다. 처음부터 끝까지 독립적으로 실행될 수 있고 순서가 정해진 여러 스텝의 모음이다.

  • JobInstance
    Job의 실행 단위로 job이 한 번 실행될 때마다 새로운 JobInstance가 만들어집니다.
    2/23일, 2/24일 실행 시 각각의 JobInstance가 생성됩니다.
    인스턴스가 따로 생성되기 때문에 2/23 실행 실패시 2/23에 대한 데이터만 처리하게 됩니다.

  • JobParameters
    Spring Batch가 Job을 실행할 때 전달되는 파라미터들의 맵
    JobInstance를 구별할 때 사용합니다.
    String, Double, Long, Date 4가지 형식을 지원합니다,

  @Bean
    @StepScope
    public JpaCursorItemReader<Booking> addStatisticsItemReader(@Value("#{jobParameters[from]}") String fromString,
                                                                @Value("#{jobParameters[to]}") String toString) {
        final LocalDateTime from = LocalDateTimeUtils.parse(fromString);  // YYYY_MM_DD_HH_MM 형태로 변환
        final LocalDateTime to = LocalDateTimeUtils.parse(toString);

        return new JpaCursorItemReaderBuilder<Booking>()
                .name("usePassesItemReader")
                .entityManagerFactory(entityManagerFactory)
                // JobParameter를 받아 종료 일시(endedAt) 기준으로 통계 대상 예약(Booking)을 조회합니다.
                .queryString("select b from BookingEntity b where b.endedAt between :from and :to")
                .parameterValues(Map.of("from", from, "to", to))
                .build();
    }

parameterValues(Map.of("from", from, "to", to) 이 코드에서
Map의 key는 @Value("#{jobParameters[Map의 key]} 와 일치해야하고, map의 value는 String fromString에 들어가게 된다

  • JobExecution
    JobInstance에 대한 실행 시도에 대한 객체입니다.
    실패하여 재실행 시킨 경우 동일한 JobInstance이나 2번 실행에 대한 JobExecution은 개별로 생기게 됩니다.
    JobInstanced 실행에 대한 상태, 시작시간, 종료시간, 생성시간 등의 정보를 담습니다.

  • Step
    Job 내부에 구성되어 실제 배치 작업 수행을 정의하고 제어한다. Job을 처리하는 단위이다. Step 안에는 Tasklet 또는 Reader, Processor, Writer 묶음이 존재한다.
    Job은 최소한 1개 이상의 Step을 가져야 하며 Job의 실제 일괄처리를 제어하는 모든 정보가 들어있다.

  • StepExecution
    Step 실행 시도에 대한 객체로 이전 단계의 Step이 실패하면 StepExecution은 생성되지 않는다.
    실제 시작이 될 때만 생성된다.
    StepExecution은 JobExecution에 저장되는 정보 외에 read 수, write 수, commit 수, skip 수 등의 정보들이 저장된다.

  • ExecutionContext
    Job에서 데이터를 공유 할 수 있는 데이터 저장소
    Spring Batch에서 제공하는 ExecutionContext는 JobExecutionContext, StepExecutionContext 2가지 종류가 있으나 이 두가지는 지정되는 범위가 다릅니다.
    JobExecutionContext의 경우 Commit 시점에 저장되는 반면 StepExecutionContext는 실행 사이에 저장이 되게 됩니다.
    ExecutionContext를 통해 Step간 Data 공유가 가능하며 Job 실패시 ExecutionContext를 통한 마지막 실행 값을 재구성할 수 있습니다.

  • JobRepository
    위의 모든 배치 처리 정보를 담고있는 매커니즘
    Job이 실행되게 되면 JobRepository에 JobExecution과 StepExecution을 생성하게 되며 JobRepository에서 Execution 정보들을 저장하고 조회하며 사용하게 된다. 즉 배치 처리에 대한 모든 메타 데이터가 담겨있다.

  • JobLauncher
    JobLauncher는 Job과 JobParameters를 사용하여 Job을 실행하는 객체로 JobExecution을 반환한다.
    (JobLauncher -> job실행 -> step 실행 -> itemReder 등을 실행)

  • ItemReader
    Step의 대상이 되는 배치 데이터를 읽어오는 인터페이스이다. 읽어올 item이 없을 때는 read() 메서드에서 null을 반환하며 그 전까지 순차적인 값을 리턴한다.
    ItemReader에 대한 다양한 인터페이스가 존재하며 다양한 방법으로 Item을 읽어올 수 있다.

    • cursor : 데이터베이스 커서를 사용하여 데이터를 한 번에 모두 메모리에 로드하지 않고, 일정한 크기의 청크(chunk) 단위로 데이터를 읽어오는 방식이다. 이를 통해 메모리 사용량을 최적화하고, 대량의 데이터에 효과적으로 접근할 수 있다. 수행시간이 오래걸릴 경우 커넥션이 끊기는 문제가 발생할 수 있다.
      커서방식은 청크(chunk) 단위의 데이터가 중복되는 경우가 발생할 수 있는데 이를 막기위해서는
      청크의 범위를 중복되지 않도록 조정하거나, 조회된 데이터에서 중복을 제거하는 등의 방법을 사용해야한다. (예를 들면 10일부터 25일까지의 예약조회와 10일부터 15일까지의 예약조회)
      • JpaCursorItemReader (Thead-safe 하지 않다)
        여러 스레드에서 동시에 동일한 커서를 사용하면 데이터베이스 커넥션 및 커서 위치 관리에 충돌이 발생할 수 있다. 따라서 Thead-safe 문제를 해결하기 위해
        SynchronizedItemStreamReader를 제공한다
    • paging : page라고 불리는 chuck 크기만큼 DB에서 레코드를 가지고 온다.
      • JpaPagingItemReader : 내부의 핵심 메서드는 Synchronized 키워드로 선언되어 있기 때문에 Thead-safe를 보장한다. chuck 크기를 너무 작게 설정하면 읽는 동안 데이터가 변경될 수 있으며, 데이터를 읽을때 정렬기준을 제대로 설정하지 않으면 데이터 누락이 발생할 수 있다.
  • ItemProcessor
    reader에서 읽어온 데이터(item)를 필터/처리 하고 write로 보내는 역할을 합니다.
    Processor는 배치를 처리하는데 필수 요소는 아니며 Reader, Writer, Processor 처리를 분리하여 각각의 역할을 명확하게 구분합니다.
    비동기방식으로 처리를 지원하는 Processor는 AsyncItemProcessor가 있다
    AsyncItemProcessor는 새로운 스레드를 할당하여 멀티 스레드로 작동하는 방식으로,TaskExecutor를 이용하여 비동기적으로 ItemProcessor를 실행하므로, 병렬 처리를 통해 성능 향상을 더욱 극대화할 수 있으며, 결과를 Future 객체로 반환합니다. AsyncItemProcessor 사용을 위해서는 아래의 의존성을 주입해야 합니다.

  • ItemWriter
    처리된 Data를 DB에 Writer(저장)할 때 사용한다.
    처리 결과물에 따라 Insert, Update, Queue의 Send 등이 될 수 있고,
    기본적으로 Item을 Chunk로 묶어 처리하고 있다.
    ItemWriter 역시 비동기 처리방식으로 AsyncItemWriter를 지원한다.

// AsyncItemProcessor, AsyncItemWriter
implementation 'org.springframework.batch:spring-batch-integration'

3. @JobScope와 @StepScope

  • Bean의 생성 시점이 스프링 애플리케이션이 실행되는 시점이 아닌 @JobScope, @StepScope가 명시된 메서드가 실행될 때까지 지연시키는 기능을 한다. 스코프가 명시된 메서드가 실행될 때 빈이 생성되고 메서드가 종료되면 삭제된다. @JobScope는 Step 선언문에서만 사용이 가능하고, @StepScope는 Step을 구성하는 ItemReader, ItemProcessor, ItemWriter에서 사용 가능하다.
  • 스프링 배치에서는 Step이 수행될 때마다, 기본적으로 스프링 빈으로 등록된 객체를 생성하여 사용하는데 이때, Step이 실행되는 도중에 객체가 다른 스레드에 의해 수정되는 경우, 데이터의 불일치가 발생할 수 있다. @StepScope를 적용하면(프록시 객체를 생성해서..?) 각각의 Step에서 실행될 때 서로의 상태를 침범하지 않고 처리를 완료할 수 있어 병렬처리에 안전하다.
    예를 들어 여러 스레드가 하나의 Step 객체빈을 공유해서 사용할 경우, @StepScope 가 선언되면 Step 객체를 여러 스레드가 공유하지 않고 step실행 시 객체가 매번 생성되어 스레드마다 할당되기 때문에 멤버 변수를 공유할 수 없어 스레드에 안전하게 된다.
  • 애플리케이션이 구동되는 시점이 아니라 비즈니스 로직이 구현되는 어디든 JobParameter를 할당함으로 유연한 설계를 가능하게 한다.
    결론 : @StepScope가 명시된 메서드가 실행될 때까지 bean생성을 지연하고, 실행될때마다 새로 bean을 만들며 Step 객체를 여러 스레드가 공유하지 않기 때문에 병렬처리에 안전하다

4. Tasklet, Chunk 방식

  • Tasklet : Tasklet은 일괄 처리 작업을 단일한 작업 단위로 수행하며 하나의 특정한 작업을 수행하는 단일 스레드 방식으로 Step내에서 한번만 실행하므로 데이터 처리과정이 한 번에 이루어진다.

  • Tasklet은 별도의 트랜잭션 매니저 없이 작업을 수행하는 단순한 방식이기 때문에 EntityManagerFactory를 사용하지 않아도 된다. EntityManagerFactory 대신에 필요한 데이터베이스 연결 및 관련 설정을 직접 구성하여 Tasklet을 구현하면 된다. (implements Tasklet로 구현)

  • Tasklet 인터페이스에 정의된 메서드인 execute 내에 repeat 이 있는데 특정 조건이 충족될 때까지 Job 또는 Step을 반복하도록 한다.

    repeat과 chunk는 완전 별개의 개념이지만, repeat 메서드 안에 Step을 정의하고, Step 안에서 chunk를 설정하면 repeat되는 동안 chunk 역시 반복 실행되게 된다.

  • Chunk : Step 안에서 처리될 Item들의 크기를 지정하는 방법 중 하나로 데이터를 지정된 Chunk size만큼 가져와서 처리하며, 멀티 스레드 방식으로 동작한다. 각 Chunk에서 ItemReader, ItemProcessor, ItemWriter를 사용한다.

5. 비동기 처리와 병렬처리

5-1 비동기 처리

  1. 작업을 백그라운드에서 실행하면서 결과를 기다리지 않고 다음 코드를 실행하는 방식으로 콜백(callback) 함수Promise 같은 개념을 사용하여 작업이 끝난 후 처리할 로직을 등록해두고, 작업이 완료되면 등록된 로직을 실행
  2. I/O 작업과 같은 시간이 오래 걸리는 작업에 사용

5-2 병렬처리

  1. 멀티 스레드방식
  2. 하나의 작업을 여러 개의 작은 작업으로 나누고 각 작업들을 별도의 스레드에서 동시에 처리
  3. CPU 작업과 같은 계산이 많이 필요한 작업에 사용

0개의 댓글