JdbcCursorItemReader List 반환 커스터마이징 적용

Lofri·2024년 2월 23일

1. 개요

사용자 등급을 변경하는 job 이 있다.
해당 job 은 등급 조정을 위해 외부 API 를 호출하는 기능이 있었고 단건 item 에 대해 처리를 진행했다.
위 API 호출을 벌크로 수정하여 성능을 개선하고자 하는데 processor 가 item 을 단건으로 받아 구현 방법을 결정해야 했다.
결론적으로 ItemReader 가 List 를 반환하도록 수정하였고 과정을 기록한다.


2. Reader 와 Processor, Writer 의 관계

Chunk-oriented step 은 아래를 기본으로 동작한다.

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}

List processedItems = new Arraylist();
for(Object item: items){
    Object processedItem = itemProcessor.process(item);
    if (processedItem != null) {
        processedItems.add(processedItem);
    }
}

itemWriter.write(processedItems);

위 코드를 본다면 아래 내용을 확인할 수 있다.

  1. Reader 는 Chunk size 만큼 read() 를 시도한다.
  2. Processor 는 reader 가 읽은 item list 중 하나의 item 을 가져와 처리를 진행한다.
  3. Writer 는 reader 가 chunk size 만큼 읽고 처리된 items 를 한번에 처리한다.

다음과 같은 결론을 도출할 수 있다.
Item processor, writerreader 의 반환 값에 영향을 받는다.
ItemReader 를 customize 한다면 processor 와 writer 가 받는 method 인자값을 변경할 수 있다.

위 내용에 따라 새로운 Reader 를 정의한다.


3. CollectionJdbcCursorItemReader

현재 해당 job 은 JdbcCursorItemReader 를 사용한다.
Reader 자체를 customize 한다면 조금 더 원하는 기능을 추가할 수 있었겠지만, 단순히 List 만을 반환하게 만들면 되기 때문에 delegate 를 통해 기능을 customize 하고자 했다.

코드 예시는 아래와 같다.

public class CollectionJdbcCursorItemReader<T> implements ItemStreamReader<List<T>> {
    private final JdbcCursorItemReader<T> delegate;
    private int listItemSize;
    private boolean isEmpty = false;
    
    CollectionJdbcCursorItemReader(int listItemSize, JdbcCursorItemReader<T> itemReader) {
        this.delegate = itemReader;
        this.listItemSize = listItemSize;
    }
    
    @Override
    public List<T> read() throws Exception {
        List<T> items = new LinkedList<>();

        if (isEmpty)
            return null;

        while (items.size() < listItemSize) {
            T item = delegate.read();

            if (Objects.isNull(item)) {
                isEmpty = true;
                return items;
            } else {
                items.add(item);
            }
        }
        return items;
    }

구현을 보면, 실제 동작은 JdbcCursorItemReader 가 하며, CollectionJdbcCursorItemReader 는 단순히 설정된 listItemSize 만큼 item 을 모아서 반환하도록 작성했다.

추가적으로 item 을 read 하면서 업데이트 해줘야하는 ItemStream 관련 기능들은
단순히 기존 위임체에 위임하여 기존 동작 그대로 동작하게 두었다.

	
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        delegate.open(executionContext);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        delegate.update(executionContext);
    }

    @Override
    public void close() throws ItemStreamException {
        delegate.close();
    }

4. Processor & Writer

위와 같이 Reader 를 생성함에 따라 processor 와 writer 는 아래와 같은 모습을 가질 수 있게 된다.

public class SimpleProcessor implements ItemProcessor<List<SimpleItem, List<SimpleItem>> {
	@Override
    public List<SimpleItem> process(List<SimpleItem> items) throws Exception {
    	// process logic
    }
}
public class SimpleWriter implements ItemWriter<List<SimpleItem> {
	@Override
    public void write(List<? extends List<BillingMember>> items) throws Exception {
    	// write logic
    }
}

5. 마무리

이번 과정을 통해 배치 step 에서 item 이 어떤식으로 전달되는지, JdbcCursorItemReader 가 어떤 방식으로 동작하는지 확인할 수 있었다.

다른 ItemReader 구현체들도 결국엔 동일한 로직을 사용하는 것으로 보아 비슷한 기능은 해당 위임 대상에 매칭되는 구현체를 생성한다면 동일한 효과를 볼 수 있을 것으로 예상된다. (또는 제너릭하게 만드는것도?)

목표했던 bulk api 호출은 성공했으나, 아래 부분에 대한 추가적인 확인이 필요하다 생각된다.

  • step 실패에 대한 재동작
  • wirter 의 item 처리 로직
  • StepExecution readCount, processCount, writeCount updating

99. References

[Spring docs] Configuring a Step

profile
Java BE

0개의 댓글