사용자 등급을 변경하는 job 이 있다.
해당 job 은 등급 조정을 위해 외부 API 를 호출하는 기능이 있었고 단건 item 에 대해 처리를 진행했다.
위 API 호출을 벌크로 수정하여 성능을 개선하고자 하는데 processor 가 item 을 단건으로 받아 구현 방법을 결정해야 했다.
결론적으로 ItemReader 가 List 를 반환하도록 수정하였고 과정을 기록한다.
Chunk-oriented step 은 아래를 기본으로 동작한다.
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
List processedItems = new Arraylist();
for(Object item: items){
Object processedItem = itemProcessor.process(item);
if (processedItem != null) {
processedItems.add(processedItem);
}
}
itemWriter.write(processedItems);
위 코드를 본다면 아래 내용을 확인할 수 있다.
다음과 같은 결론을 도출할 수 있다.
Item processor, writer 는 reader 의 반환 값에 영향을 받는다.
ItemReader 를 customize 한다면 processor 와 writer 가 받는 method 인자값을 변경할 수 있다.
위 내용에 따라 새로운 Reader 를 정의한다.
현재 해당 job 은 JdbcCursorItemReader 를 사용한다.
Reader 자체를 customize 한다면 조금 더 원하는 기능을 추가할 수 있었겠지만, 단순히 List 만을 반환하게 만들면 되기 때문에 delegate 를 통해 기능을 customize 하고자 했다.
코드 예시는 아래와 같다.
public class CollectionJdbcCursorItemReader<T> implements ItemStreamReader<List<T>> {
private final JdbcCursorItemReader<T> delegate;
private int listItemSize;
private boolean isEmpty = false;
CollectionJdbcCursorItemReader(int listItemSize, JdbcCursorItemReader<T> itemReader) {
this.delegate = itemReader;
this.listItemSize = listItemSize;
}
@Override
public List<T> read() throws Exception {
List<T> items = new LinkedList<>();
if (isEmpty)
return null;
while (items.size() < listItemSize) {
T item = delegate.read();
if (Objects.isNull(item)) {
isEmpty = true;
return items;
} else {
items.add(item);
}
}
return items;
}
구현을 보면, 실제 동작은 JdbcCursorItemReader 가 하며, CollectionJdbcCursorItemReader 는 단순히 설정된 listItemSize 만큼 item 을 모아서 반환하도록 작성했다.
추가적으로 item 을 read 하면서 업데이트 해줘야하는 ItemStream 관련 기능들은
단순히 기존 위임체에 위임하여 기존 동작 그대로 동작하게 두었다.
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
delegate.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
delegate.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
delegate.close();
}
위와 같이 Reader 를 생성함에 따라 processor 와 writer 는 아래와 같은 모습을 가질 수 있게 된다.
public class SimpleProcessor implements ItemProcessor<List<SimpleItem, List<SimpleItem>> {
@Override
public List<SimpleItem> process(List<SimpleItem> items) throws Exception {
// process logic
}
}
public class SimpleWriter implements ItemWriter<List<SimpleItem> {
@Override
public void write(List<? extends List<BillingMember>> items) throws Exception {
// write logic
}
}
이번 과정을 통해 배치 step 에서 item 이 어떤식으로 전달되는지, JdbcCursorItemReader 가 어떤 방식으로 동작하는지 확인할 수 있었다.
다른 ItemReader 구현체들도 결국엔 동일한 로직을 사용하는 것으로 보아 비슷한 기능은 해당 위임 대상에 매칭되는 구현체를 생성한다면 동일한 효과를 볼 수 있을 것으로 예상된다. (또는 제너릭하게 만드는것도?)
목표했던 bulk api 호출은 성공했으나, 아래 부분에 대한 추가적인 확인이 필요하다 생각된다.