[Spring Batch 9,10편] CustomItemReader/Writer와 Flow 컨트롤

송하연·2024년 12월 10일
1
post-thumbnail

이전글 - [Spring Batch 8편] CompositeItemProcessor 사용해보기🔧

Custom ItemReader와 ItemWriter에 대해 알아보기

스프링 배치는 다양한 내장 ItemReaderItemWriter를 제공해 일반적인 데이터 처리 작업을 손쉽게 수행할 수 있도록 지원한다.
하지만 특정 비즈니스 로직에 맞는 배치를 구현하기 위해서는 커스터마이징이 필요한데, 이처럼 이번에는 Custom ItemReaderCustom ItemWriter를 구현하여 배치 작업에서 활용하는 방법에 대해 알아보자

QuerydslPagingItemReader란?

QuerydslPagingItemReader는 Spring Batch에서 공식적으로 제공되는 Reader는 아니지만, AbstractPagingItemReader를 상속받아 Querydsl을 활용해 데이터를 페이징하며 읽을 수 있도록 구현한 커스터마이즈 Reader이다.

QuerydslPagingItemReader의 주요 특징

  • Querydsl 기능 활용: 동적 쿼리 기능을 활용해 데이터를 효율적으로 페이징하며 읽어옴
  • 동적 쿼리 지원: Querydsl의 강력한 동적 쿼리 기능을 활용해 런타임 조건에 따라 데이터를 필터링.
  • JPA 엔티티 추상화: JPA 엔티티에 의존하지 않고 추상화된 쿼리를 작성해 유지보수에 용이.

QuerydslPagingItemReader 구현하기

public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {

    private EntityManager em;
    private final Function<JPAQueryFactory, JPAQuery<T>> querySupplier;
    private final Boolean alwaysReadFromZero;

    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory, 
                                    Function<JPAQueryFactory, JPAQuery<T>> querySupplier, 
                                    int chunkSize) {
        this(entityManagerFactory, querySupplier, chunkSize, false);
    }

    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory, 
                                    Function<JPAQueryFactory, JPAQuery<T>> querySupplier, 
                                    int chunkSize, 
                                    Boolean alwaysReadFromZero) {
        super.setPageSize(chunkSize);
        this.em = entityManagerFactory.createEntityManager();
        this.querySupplier = querySupplier;
        this.alwaysReadFromZero = alwaysReadFromZero;
    }

    @Override
    protected void doReadPage() {
        initQueryResult();

        JPAQueryFactory jpaQueryFactory = new JPAQueryFactory(em);
        long offset = alwaysReadFromZero ? 0 : (long) getPage() * getPageSize();

        JPAQuery<T> query = querySupplier.apply(jpaQueryFactory)
                                         .offset(offset)
                                         .limit(getPageSize());
        List<T> queryResult = query.fetch();

        for (T entity : queryResult) {
            em.detach(entity); 
            results.add(entity);
        }
    }

    @Override
    protected void doClose() {
        if (em != null) em.close();
        super.doClose();
    }

    private void initQueryResult() {
        if (results == null) {
            results = new CopyOnWriteArrayList<>();
        } else {
            results.clear();
        }
    }
}

주요 메서드

doReadPage():

Querydsl의 JPAQueryFactory를 통해 동적으로 생성된 쿼리를 실행하며 데이터를 페이징 처리한다.
=> offset과 limit을 계산하고, 이를 Querydsl의 쿼리에 적용한다.
조회된 결과는 results 리스트에 저장한다.

doClose():

사용이 끝난 EntityManager를 닫아 리소스를 해제한다.

initQueryResult():

매 페이지마다 새로운 데이터를 저장하기 위해 results를 초기화한다.

생성자 주요 파라미터

  • EntityManagerFactory: JPA 엔티티 매니저를 생성하기 위한 팩토리 객체.
  • Function<JPAQueryFactory, JPAQuery> querySupplier: Querydsl을 통해 동적 쿼리를 생성하기 위한 함수형 인터페이스.
  • chunkSize: 한 번에 읽어올 데이터의 크기를 지정.
  • alwaysReadFromZero: 항상 0부터 읽어올지 여부를 설정. 페이징 데이터를 업데이트하거나 삭제하는 경우 데이터 누락 방지를 위해 사용.

빌더 패턴을 이용한 생성

코드의 가독성을 높이기 위해 빌더 패턴을 사용하여 QuerydslPagingItemReader 객체를 생성할 수도 있다.

@Bean
public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() {
    return new QuerydslPagingItemReaderBuilder<Customer>()
            .name("customerQuerydslPagingItemReader")
            .entityManagerFactory(entityManagerFactory)
            .chunkSize(10)
            .querySupplier(jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer)
                .from(QCustomer.customer)
                .where(QCustomer.customer.age.gt(20)))
            .build();
}

CustomItemWriter란?

CustomItemWriter는 Spring Batch에서 기본 제공되는 ItemWriter 인터페이스를 구현하여 특정 비즈니스 요구 사항에 맞는 데이터 처리를 수행하도록 커스터마이징한다. 기본 Writer로 처리할 수 없는 고유한 작업을 구현할 때 활용한다.

CustomItemWriter 주요 특징

구성 요소

  • ItemWriter 인터페이스 구현: write() 메소드를 오버라이드하여 데이터 처리 로직을 구현한다.
  • 필요한 라이브러리 및 객체 선언: 외부 서비스 호출이나 파일 작업 등 필요한 의존성을 선언한다.
  • 데이터 처리 로직 구현: write() 메소드 내부에서 데이터를 가공하거나 외부 서비스와 연계하는 로직을 구현한다

장점
유연성: 다양한 비즈니스 로직에 맞게 커스터마이징할 수 있다.
확장성: 데이터 처리 과정을 확장하거나 변경할 수 있다.
제어 가능성: 데이터 처리 흐름을 세부적으로 제어할 수 있다.

단점
개발 복잡성: 구현 과정이 다소 복잡할 수 있다.
테스트 어려움: 로직이 복잡해질수록 테스트 작성이 어려워질 수 있다.
디버깅 어려움: 문제 발생 시 디버깅이 더 어려울 수 있다.

CustomItemWriter 구현하기

@Slf4j
@Service
public class CustomService {

    public Map<String, String> processToOtherService(Customer item) {
        log.info("Call API to OtherService....");
        return Map.of("code", "200", "message", "OK");
    }
}

CustomService는 단순히 로그를 출력하고 응답을 반환하도록 작성한다.

CustomItemWriter 작성하기
ItemWriter 인터페이스를 구현하여 데이터를 처리한다.


@Slf4j
@Component
public class CustomItemWriter implements ItemWriter<Customer> {

    private final CustomService customService;

    public CustomItemWriter(CustomService customService) {
        this.customService = customService;
    }

    @Override
    public void write(Chunk<? extends Customer> chunk) throws Exception {
        for (Customer customer : chunk) {
            log.info("Processing in CustomItemWriter...");
            customService.processToOtherService(customer);
        }
    }
}
  • write() 메소드는 데이터 청크를 순회하며, 각각의 데이터를 CustomService에 전달해 처리한다.

Step 설정

@Bean
public Step customWriterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("customWriterStep", jobRepository)
            .<Customer, Customer>chunk(10, transactionManager)
            .reader(flatFileItemReader())
            .writer(customItemWriter)
            .build();
}

Job 설정

@Bean
public Job customWriterJob(Step customWriterStep, JobRepository jobRepository) {
    return new JobBuilder("customWriterJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .start(customWriterStep)
            .build();
}

Spring Batch의 Flow 컨트롤

Spring Batch는 여러 Step을 정의하고, 조건에 따라 실행 순서를 조정하거나 특정 Step을 건너뛰는 등의 Flow 컨트롤 기능을 제공한다. 이 기능은 FlowBuilder API를 통해 설정된다.

Flow 컨트롤 주요 구성 요소

  • next: 현재 Step이 성공적으로 종료된 후, 다음 Step으로 이동한다.
  • on: 특정 ExitStatus에 따라 다음 Step으로 이동 여부를 결정한다.
  • from: 특정 Step의 결과에 따라 현재 Step에서 다른 Step으로 이동한다.
  • to: 특정 Step으로 이동한다.
  • stop: 특정 조건에서 작업을 중단한다.
  • end: FlowBuilder를 종료한다.

주요 Flow 컨트롤 방법 구현하기

1. next
next를 사용하면 여러 Step을 순차적으로 연결할 수 있다.
기본적으로 한 Step이 완료되면 다음 Step으로 이동한다.

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(step1())
        .next(step2())
        .end()
        .build();
}

위 예제는 step1이 완료된 후 step2로 넘어가는 간단한 Flow를 보여준다.

2. on
on은 특정 Step의 종료 조건(ExitStatus)에 따라 다음 Step으로 이동할지 결정한다.
조건에 따라 다른 Step으로 분기할 수 있다.

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(step1())
        .on("FAILED").to(step3())
        .from(step1()).on("COMPLETED").to(step2())
        .end()
        .build();
}

step1의 결과가 FAILED면 step3으로 이동하고, COMPLETED면 step2로 이동한다.
조건에 따라 서로 다른 Flow를 설정할 수 있다.

3. stop
stop은 특정 Step의 종료 상태를 확인하여 배치 작업을 중단한다.

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(step1())
        .on("FAILED").stop()
        .end()
        .build();
}

step1의 결과가 FAILED면 배치 작업을 중단한다.
특정 조건에서 작업을 멈추고 에러를 방지하거나 재작업을 유도할 수 있다.

Flow 컨트롤 마무리

next를 활용해 순차적인 작업을 수행할 수 있다.
onfrom을 통해 조건에 따라 분기 처리할 수 있다.
stop으로 특정 조건에서 배치 작업을 종료하여 안정성을 높일 수 있다.

참고글 - [SpringBatch 연재 10] 스프링배치 플로우 컨트롤 하기
깃허브 - https://github.com/hysong4u/springbatch

profile
개발 기록 끄적끄적✏️ #백엔드개발자

2개의 댓글

comment-user-thumbnail
2024년 12월 11일

진심으로 10회 동안 너무 성실하게 꿋꿋히 참여해줘서 너무 고마워요 ㅠㅠ
하연쓰 님에 대해서도 이야기 많이 했어야하는데 못해서 너무 아쉬워요 ㅠㅠ
그럴 기회도 생기면 좋을 거 같아요 ㅠㅠ

진심으로 고생했고 ㅠㅠ 연말에는 숨 한번 크게 돌리면서 건강도 챙기면서 활동하길 바라고요 ㅠㅠ
꼭 이야기 할 기회있으면 꼭 만들어 봐요!!@#@!#

1개의 답글