Batch 기본 Chunk 방식

Kevin·2023년 12월 4일
0

Spring Batch

목록 보기
2/3
post-thumbnail

Chunk란 여러 개의 아이템을 묶은 하나의 덩어리, 블록을 의미합니다. 한번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리합니다.

  1. 이용권 만료

job : 이용권 만료

step : 이용권 만료, Chunk 방식 채택

  • 이용권 만료 대상을 읽어서 그 대상들을 만료 상태로 업데이트 시키면 된다.
  • step 내부
    • ExpirePassesReader
    • ExpirePassesWriter

전체 코드

@Getter
@Setter
@ToString
@Entity
@Table(name = "pass")
public class PassEntity extends BaseEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY) // 기본 키 생성을 DB에 위임합니다. (AUTO_INCREMENT)
    private Integer passSeq;
    private Integer packageSeq;
    private String userId;

    @Enumerated(EnumType.STRING)
    private PassStatus status;
    private Integer remainingCount;

    private LocalDateTime startedAt;
    private LocalDateTime endedAt;
    private LocalDateTime expiredAt;

}
package com.fastcampus.pass.job.pass;

import com.fastcampus.pass.repository.pass.PassEntity;
import com.fastcampus.pass.repository.pass.PassStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.time.LocalDateTime;
import java.util.Map;

@Configuration
public class ExpirePassesJobConfig {
    private final int CHUNK_SIZE = 1;

    // @EnableBatchProcessing로 인해 Bean으로 제공된 JobBuilderFactory, StepBuilderFactory
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    public ExpirePassesJobConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, EntityManagerFactory entityManagerFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.entityManagerFactory = entityManagerFactory;
    }

    @Bean
    public Job expirePassesJob() {
        return this.jobBuilderFactory.get("expirePassesJob")
                .start(expirePassesStep())
                .build();
    }

    @Bean
    public Step expirePassesStep() {
        return this.stepBuilderFactory.get("expirePassesStep")
                .<PassEntity, PassEntity>chunk(CHUNK_SIZE)
                .reader(expirePassesItemReader())
                .processor(expirePassesItemProcessor())
                .writer(expirePassesItemWriter())
                .build();

    }

    /**
     * JpaCursorItemReader: JpaPagingItemReader만 지원하다가 Spring 4.3에서 추가되었습니다.
     * 페이징 기법보다 보다 높은 성능으로, 데이터 변경에 무관한 무결성 조회가 가능합니다.
     */
    @Bean
    @StepScope
    public JpaCursorItemReader<PassEntity> expirePassesItemReader() {
        return new JpaCursorItemReaderBuilder<PassEntity>()
                .name("expirePassesItemReader")
                .entityManagerFactory(entityManagerFactory)
                // 상태(status)가 진행중이며, 종료일시(endedAt)이 현재 시점보다 과거일 경우 만료 대상이 됩니다.
                .queryString("select p from PassEntity p where p.status = :status and p.endedAt <= :endedAt")
                .parameterValues(Map.of("status", PassStatus.PROGRESSED, "endedAt", LocalDateTime.now()))
                .build();
    }

    @Bean
    public ItemProcessor<PassEntity, PassEntity> expirePassesItemProcessor() {
        return passEntity -> {
            passEntity.setStatus(PassStatus.EXPIRED);
            passEntity.setExpiredAt(LocalDateTime.now());
            return passEntity;
        };
    }

    /**
     * JpaItemWriter: JPA의 영속성 관리를 위해 EntityManager를 필수로 설정해줘야 합니다.
     */
    @Bean
    public JpaItemWriter<PassEntity> expirePassesItemWriter() {
        return new JpaItemWriterBuilder<PassEntity>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

}

배치 작업 순서

  1. BatchConfig 작성으로 전역으로 배치 설정
    ```java
    /*
     * @EnableBatchProcessing
     * Spring Batch 기능을 활성화하고 배치 작업을 설정하기 위한 기본 구성을 제공합니다.
     * JobRepository, JobLauncher, JobRegistry, PlatformTransactionManager, JobBuilderFactory, StepBuilderFactory 빈으로 제공됩니다.
     * https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/core/configuration/annotation/EnableBatchProcessing.html
     */
    
    @EnableBatchProcessing
    @Configuration
    public class BatchConfig {
    
        /**
         * JobRegistry는 context에서 Job을 추적할 때 유용합니다.
         * JobRegistryBeanPostProcessor는 Application Context가 올라가면서 bean 등록 시, 자동으로 JobRegistry에 Job을 등록 시켜줍니다.
         */
        @Bean
        public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
            JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
            jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
            return jobRegistryBeanPostProcessor;
    
        }
    }
    ```

  2. 배치를 작업할 Feature의 배치 설정 클래스를 생성
    1. 이 때 클래스 내부에 JobBuilderFactory , StepBuilderFactory 를 주입받는다.
      • 위 전역으로 @EnableBatchProcessing을 통해서 Spring에서 자동으로 주입해준다.

  1. Feature에 맞는 Job을 먼저 작성한다.

    @Bean
        public Job expirePassesJob() {
            return this.jobBuilderFactory.get("expirePassesJob")
                    .start(expirePassesStep())
                    .build();
        }

    .start(expirePassesStep()) ← job을 이루는 pass를 설정한다.

  2. step을 작성한다.

    @Bean
        public Step expirePassesStep() {
            return this.stepBuilderFactory.get("expirePassesStep")
                    .<PassEntity, PassEntity>chunk(CHUNK_SIZE)
                    .reader(expirePassesItemReader())
                    .processor(expirePassesItemProcessor())
                    .writer(expirePassesItemWriter())
                    .build();
        }

    .<PassEntity, PassEntity>chunk(CHUNK_SIZE) ← Chunk 방식의 Step이기에 Chunk를 설정해주어야 한다.

    이 때 PassEntity, PassEntity 은 각각 Input, Output 타입이다.

    public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
    		return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
    	}

    private final int CHUNK_SIZE = 1; ← ChunkSize는 1로 설정해두었는데, 이러면 1개의 DB row마다 트랜잭션이 발생한다.

    물론 실 개발시에는 이렇게 작게 설정 안한다.

    .reader(expirePassesItemReader()) .processor(expirePassesItemProcessor()) .writer(expirePassesItemWriter())

    → 해당 Step에 맞는 각 Item 구성 요소들을 설정해준다.

  3. ItemReader를 작성한다.

    /**
         * JpaCursorItemReader: JpaPagingItemReader만 지원하다가 Spring 4.3에서 추가되었습니다.
         * 페이징 기법보다 보다 높은 성능으로, 데이터 변경에 무관한 무결성 조회가 가능합니다.
         */
        @Bean
        @StepScope
        public JpaCursorItemReader<PassEntity> expirePassesItemReader() {
            return new JpaCursorItemReaderBuilder<PassEntity>()
                    .name("expirePassesItemReader")
                    .entityManagerFactory(entityManagerFactory)
                    // 상태(status)가 진행중이며, 종료일시(endedAt)이 현재 시점보다 과거일 경우 만료 대상이 됩니다.
                    .queryString("select p from PassEntity p where p.status = :status and p.endedAt <= :endedAt")
                    .parameterValues(Map.of("status", PassStatus.PROGRESSED, "endedAt", LocalDateTime.now()))
                    .build();
        }

    여기서 커서 방식으로 조회를 한다.

    .entityManagerFactory(entityManagerFactory) ← JPAItemReader, Writer 모두 EntityManagerFactory를 선언을 해주어야 한다. 왜냐하면 DB로부터 CRUD를 해야하기 때문에

    .queryString("select p from PassEntity p where p.status = :status and p.endedAt <= :endedAt") ← 바로 JPQL을 작성해주면 된다.

    .parameterValues(Map.of("status", PassStatus.PROGRESSED, "endedAt", LocalDateTime.now())) ← JPQL 인자에 대한 값을 Map 형태로 Key, Value 맞춰서 넣어주면 된다.


@StepScope ← Bean의 생성 시점이 스프링 애플리케이션이 실행되는 시점이 아닌 @JobScope, @StepScope가 명시된 메서드가 실행될 때까지 지연시키는 것을 의미

지연 시켰을 때 얻는 이점

  1. JobParameter를 특정 메서드가 실행하는 시점까지 지연시켜 할당시킬 수 있습니다.

즉, 애플리케이션이 구동되는 시점이 아니라 비즈니스 로직이 구현되는 어디든 JobParameter를 할당함으로 유연한 설계를 가능

  1. 병렬처리에 안전합니다.

Step의 구성요소인 ItemReader, ItemProcessor, ItemWriter이 있고, ItemReader에서 데이터를 읽어 오는 메서드를 서로 다른 Step으로 부터 동시에 병렬 실행이 된다면 서로 상태를 간섭받게 될 수 있습니다. 하지만 @StepScope를 적용하면 각각의 Step에서 실행될 때 서로의 상태를 침범하지 않고 처리를 완료할 수 있습니다

💡 @JobScope는 Step 선언문에서만 사용이 가능하고, @StepScope는 Step을 구성하는 ItemReader, ItemProcessor, ItemWriter에서 사용 가능합니다.

@JobScope와 @StepScope

페이징이 아니라 커서를 쓴 이유 ← 현재 비즈니스 로직상 한 페이지에서 값을 불러온 후 값을 변경하면 다음 페이지에서 값이 변경되어서 누락이 되는 경우가 발생할 수 있기에

커서는 데이터 변경에 무결성하다.

  1. ItemProcessor를 작성한다.

    @Bean
        public ItemProcessor<PassEntity, PassEntity> expirePassesItemProcessor() {
            return passEntity -> {
                passEntity.setStatus(PassStatus.EXPIRED);
                passEntity.setExpiredAt(LocalDateTime.now());
                return passEntity;
            };
        }

    ItemReader를 통해서 읽어온 PassEntity의 데이터 값을 람다식으로 변경시켜주고 반환한다.

  2. ItemWriter를 작성한다.

    /**
         * JpaItemWriter: JPA의 영속성 관리를 위해 EntityManager를 필수로 설정해줘야 합니다.
         */
        @Bean
        public JpaItemWriter<PassEntity> expirePassesItemWriter() {
            return new JpaItemWriterBuilder<PassEntity>()
                    .entityManagerFactory(entityManagerFactory)
                    .build();
        }

    ItemWriter는 단순히 entityManager를 감싸고 있는 래퍼라고 생각해도 된다.




Test 코드

@Slf4j
@SpringBatchTest
@SpringBootTest
@ActiveProfiles("test")
@ContextConfiguration(classes = {ExpirePassesJobConfig.class, TestBatchConfig.class})
public class ExpirePassesJobConfigTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private PassRepository passRepository;

    @Test
    public void test_expirePassesStep() throws Exception {
        // given
        addPassEntities(10);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        JobInstance jobInstance = jobExecution.getJobInstance();

        // then
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
        assertEquals("expirePassesJob", jobInstance.getJobName());

    }

    private void addPassEntities(int size) {
        final LocalDateTime now = LocalDateTime.now();
        final Random random = new Random();

        List<PassEntity> passEntities = new ArrayList<>();
        for (int i = 0; i < size; ++i) {
            PassEntity passEntity = new PassEntity();
            passEntity.setPackageSeq(1);
            passEntity.setUserId("A" + 1000000 + i);
            passEntity.setStatus(PassStatus.PROGRESSED);
            passEntity.setRemainingCount(random.nextInt(11));
            passEntity.setStartedAt(now.minusDays(60));
            passEntity.setEndedAt(now.minusDays(1));
            passEntities.add(passEntity);

        }
        passRepository.saveAll(passEntities);

    }

}

@ContextConfiguration(classes = {ExpirePassesJobConfig.class, TestBatchConfig.class}) ← 테스트간 자동으로 만들어줄 애플리케이션 컨텍스트의 설정파일 위치를 지정, ExpirePassesJobConfig와 TestBatchConfig를 테스트간 설정 파일로 지정

JobLauncherTestUtils ← 스프링 배치 테스트에 필요한 유틸 기능

JobRepositoryTestUtils ← 데이터베이스에 저장된 JobExcution을 생성/삭제 지원

/**
	 * Launch the entire job, including all steps.
	 * 
	 * @return JobExecution, so that the test can validate the exit status
	 * @throws Exception thrown if error occurs launching the job.
	 */
	public JobExecution launchJob() throws Exception {
		return this.launchJob(this.getUniqueJobParameters());
	}

jobLauncherTestUtils.launchJob(); ← 아무 인자도 안넘겨주면, 모든 Job과 step들을 시작시킨다.

Spring Batch Test 작성 방법 및 고찰 - Yun Blog | 기술 블로그

profile
Hello, World! \n

0개의 댓글