보통의 Spring Batch Job은 Job-Step, 이때 Step을 Reader-Processor-Writer로 구성한다.
또한 일전에 구성하였던 것처럼 Step의 역할을 데이터 추출, 데이터 쓰기 등 기능적으로 분리하여 여러개의 Step을 구현하기도 하였다.
Spring Batch 5.2ver 이후에 추가된 Reader/Writer 컴포넌트의 경우 완전히 새로운 방식의 batch job처리를 지원하는데, ItemReader/Writer 자체를 여러개, 다양하게 구성하여 Reader를 순차적으로 진행하는 방안을 제공하여 준다.
또한 이러한 읽어온 데이터를 다양한 데이터 저장소에 write를 한다.
이처럼 읽어들일 data source가 다양한 상황일때 유용할 수 있는, Spring batch에서 새롭게 제안하는 CompositeItemReader/Writer에 대해 분석한 내용을 기록해보았다.
CompositeItemReader/Writer의 핵심 동작원리는 위임이다.
내부적으로 다양한 원천에서 데이터를 읽고, 이 읽은 데이터들을 동시적으로 다른 데이터 저장소(flatFile/RDB/NoSQL 등)에 저장이 필요할 경우 각각의 writer에게 처리를 위임한다.
즉, 쉽게 말해서 CompositeItemReader는 데이터를 합치지 않는다. 더불어 CompositeItemWriter는 데이터를 나누지 않는다.
(item1 + item2 + item3) → write
item1 → chunk → 모든 writer에 write
item2 → chunk → 모든 writer에 write
item3 → chunk → 모든 writer에 write
데이터를 한곳에 모아두지 않고, 읽어오는(Reader를 호출하는 그대로) 순서대로 동시에 Writer가 데이터를 전달받아 데이터를 처리한다.
데이터베이스가 논리적/물리적으로 분리되어있는 분산(샤드) 환경이거나, 물리적으로 읽어야할 데이터의 원천이 로그 파일/RDB 등 단일 ItemReader로는 데이터 읽기가 불가능할 경우, CompositeItemReader를 통해 다양한 Reader를 통해 얻어온 데이터를 읽어온다.
List<ItemStreamReader<DataLog>> readers = List.of(....);
CompositeReader의 생성자 매개변수로 ItemReader 리스트를 전달해야 하는데, 그 리스트를 먼저 구성해주면 된다.
CompositeItemReader<DataLog> compositeReader = new CompositeItemReader<>(readers);
이 경우 최종적으로 아래와 같은 동작으로 데이터를 읽어오는 과정을 진행하게 된다.
read()
├─ 현재 reader.read()
│ ├─ 데이터 있음 → return item
│ └─ null → 다음 reader로 이동
└─ 모든 reader가 null → 전체 EOF
즉, Reader가 순차처리를 진행하면서 더이상 읽을 데이터가 없다면 다음 Reader로 넘어간다.
참고로 데이터의 구분자는 별도 제공하지 않고, 반드시 직접 구현하거나 구분자를 추가해주어야 하며, 모든 Reader는 동일한 형태를 반환해야 Writer가 이를 받아 일괄적으로 데이터 쓰기를 처리할 수 있다.
class DataSource {
SourceType source; // FILE / DB / REDIS
...
}
Writer는 Reader가 순차적으로 호출하면서 읽은 데이터를 그대로 받아, CompositeItemWriter가 등록한 모든 Writer가 동시에 이를 처리한다.
[Chunk: List<T>]
|
v
[CompositeItemWriter]
|
+--> JdbcItemWriter (items 전체)
|
+--> RedisItemWriter (items 전체)
|
+--> FileItemWriter (items 전체)
이를 좀 더 세부적으로 살펴보면,
CompositeItemReader
|
| read() → T
v
ItemProcessor (선택)
|
| chunk size 만큼 누적
v
CompositeItemWriter
|
| write(List<T>)
v
모든 delegate writer에 동일한 List 전달
Reader에서 읽은 데이터를 구분하지 않고, CompositeItemWriter 측에서 chunk List 형태로 전달받고 내부의 모든 writer가 처리를 위임받아 동시적으로 해당 데이터를 write하게 된다.
만약 출처별 분기처리가 필요하다면, 자체적으로 객체 프로퍼티에 출처 등을 표기할 수 있는 구분자가 필요하다.
보통은 이럴 경우 ClassifierCompositeItemWriter를 사용하며, item을 classfied한 상태로 writer를 선택할 수 있도록 구성한다.
ClassifierCompositeItemWriter<T>
classifier(item) -> writer 선택
이때 classifier에서 item 출처에 대한 프로퍼티를 활용하여 전략적으로 writer를 선택할 수 있도록 유도할 수 있다.
item -> {
if (item.getSource() == REDIS) return redisWriter;
if (item.getSource() == DB) return dbWriter;
return fileWriter;
}
따라서 출처분기가 필요하다면 별도의 CompositeItemReader/Writer가 필요할 것이며, 일반적인 CompositeItemReader/Writer는 단순히 여러 원천에서 데이터를 읽고 데이터를 여러 곳에 write하기 위한 목적으로 사용한다.
참고로 list로 등록한 writer를 동시에 호출하는 것은 아니고, 순차적으로 호출하여 데이터를 write하지만 개념적으로 보았을때는 동일하게 데이터를 각각의 target 저장소에 쓰기때문에 편의상 동시 처리라 표현하였다.
보통 write의 과정을 생각하면, reader로부터 데이터를 읽고 chunk size만큼 데이터를 일단 처리, beforeCommit 시점까지 데이터 쓰기 작업을 미루다가 일괄적으로 commit을 진행한다.
이러한 공통화된 생명주기는 compositeItemWriter 입장에서는 일괄적인 데이터 쓰기 처리를 가능하게 해주는 강력한 요소이다.
즉, flatFileItemWriter/mongoItemWriter/RedisItemWriter 등 다양한 데이터를 쓰는 작업이 있을때 각각의 writer는 데이터를 chunk size만큼 처리하기 전에 먼저 buffer에 작업사항을 buffer에 담아둔다(commit 이전).
그리고 chunk size만큼 처리를 진행한 후에 비로소 commit을 한다.
CompositeItemWriter 입장에서는 모든 데이터를 처리하기 전에 buffer에 담고, 모든 writer가 처리작업을 완료하였을때 비로소 최종 반영한다.
만약 다른 writer가 처리하는 시점에 예상치 못한 예외가 발생하였다면, 다른 writer에서 처리한 데이터는 commit되지 않고, 최종 반영되지 않는다(모두 rollback 되거나 file에 데이터 쓰기가 일어나지 않는다).
따라서 이러한 spring batch에서 제공하는 철저한 생명주기는, composite Item Writer 입장에서 데이터의 일관성을 보장할 수 있는 매우 중요한 장치가 될 수 있겠다.
일단 중요한 컴포넌트인 ClassiferCompositeItemWriter를 적용하는 방법을 중심으로 분석하였다.
참고로 ClassfierCompositeItemReader는 따로 제공하지는 않지만, 일전에 PatternMatchingCompositeLineMapper처럼 특정 패턴 혹은 프로퍼티를 기준으로 파싱객체를 따로 지정해주는 방법으로 (Customized) Reader 구현 방안을 고려해보면 좋을 것이다.
일단 기본적인 job 실행 구조를 설계한다.
@Bean
public Step systemLogProcessingStep() {
return new StepBuilder("systemLogProcessingStep", jobRepository)
.<SystemLog, SystemLog>chunk(10, transactionManager)
.reader(systemLogProcessingReader())
.writer(classifierWriter())
.build();
}
이때 classifierWriter라는 ClassifierCompositeItemWriter 객체를 주입한다.
Classifier라는 수식에서 살펴볼 수 있듯이, item을 받아서 해당 아이템의 프로퍼티 및 특정 값을 기준으로 writer를 어떠한 것으로 사용할지 결정할 수 있는 강력한 장치가 존재한다.
Item → Classifier → ItemWriter 결정 → 해당 Writer에 위임
즉 이를 위해, ClassifierCompositeItemWriter 내부에는 어떠한 객체를 받아오고, 해당 객체를 어떠한 writer에게 위임할 것인지 지정해줄 수 있다.
@Bean
public ClassifierCompositeItemWriter<SystemLog> classifierWriter() {
ClassifierCompositeItemWriter<SystemLog> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier(new SystemLogClassifier(criticalLogWriter(), normalLogWriter()));
return writer;
}
ClassifierCompositeItemWriter는 위와 같이 제너릭을 통해 어떠한 객체를 직렬화할 것인지, 어떠한 writer를 classified할 것인지 지정해줄 수 있다.
만약 writer가 여러개라면, Classifier의 생성자 인자에 그만큼의 writer를 전달해주면 되겠다.
new SystemLogClassifier(
criticalWriter,
normalWriter,
warningWriter,
infoWriter
);
그럴 경우 아래와 같이, writer를 선별할 수 있는 분기처리를 적절하게 구현해주면 된다.
@Override
public ItemWriter<? super SystemLog> classify(SystemLog item) {
if (...) return criticalWriter;
if (...) return warningWriter;
return normalWriter;
}
Classifier의 경우 SubclassClassifier와 같은 기본 제공 구현체가 있긴 하지만, 로직을 명기하는 차원에서 Customized하는 방안도 나쁘지 않다.
Classifier 인터페이스는 아래와 같이 제공된다.
public interface Classifier<C, T> {
T classify(C classifiable);
}
분류대상인 Classfiying Object, 즉 읽어들일 객체와 이 객체를 classify하여 반환할 결과인 Type으로 구성이 되어있고, CustomizedClassifier의 경우 반환 결과는 ItemWriter이다.
public static class SystemLogClassifier implements Classifier<SystemLog, ItemWriter<? super SystemLog>> {
...
private final ItemWriter<SystemLog> criticalWriter;
private final ItemWriter<SystemLog> normalWriter;
public SystemLogClassifier(
ItemWriter<SystemLog> criticalWriter,
ItemWriter<SystemLog> normalWriter) {
this.criticalWriter = criticalWriter;
this.normalWriter = normalWriter;
}
@Override
public ItemWriter<SystemLog> classify(SystemLog log) { //Classifier의 Input 객체
if (isCritical(log)) {
return criticalWriter;
}
return normalWriter;
}
...
}
일단 먼저 살펴볼 점은 Classifier의 제너릭인 Classifier<SystemLog, ItemWriter<? super SystemLog>> 부분이다.
앞서 잠깐 살펴보긴 하였는데, 기본적으로 ItemWriter는 소비하는 입장에서 PECS 원칙을 준수하여야 한다. 그렇기에 제너릭 타입이 부모 한정 와일드카드로 구성이 되어있는 것이고, 이는 개념 차원에서 알아두도록 하자.
또한
Classifier<SystemLog, ItemWriter<SystemLog>>
으로 writer의 입력 객체를 특정한 형태면 컴파일 오류가 발생한다.
public class ClassifierCompositeItemWriter<T>
implements ItemWriter<T> {
private Classifier<? super T, ItemWriter<? super T>> classifier;
}
ClassifierCompositeItemWriter를 살펴보았을때, Classifier의 제너릭 형태는 위와 같이 어떠한 입력객체를 받을 것이고, 어떤 ItemWriter를 선별결과로 활용할 것인지에 대한 내용이다.
내부적으로 이러한 제너릭 형태가 정해져있기에, Classifier 인터페이스를 사용하여 구성해줄때도 반드시 이러한 규칙을 지킬 필요가 있다.
즉, Classifier 제너릭 형태를 위와 같은 형태가 아닌, 일반 객체로 해버리면 내부적인 PECS 원칙을 위배하게 되고 반환 타입이 구체적인 형태가 되어버린다.
예를 들어 아래와 같은 경우라면?
Classifier<String, ItemWriter<? super SystemLog>>
이는 내부 제너릭 형태가 서로 일치하지 않아 컴파일 오류가 발생한다.
Classifier는 이처럼 내부 타입 일치 등에 신경쓰고, 생성자 주입 시 어떠한 writer를 선별결과로 사용하며 어떻게 활용할 것인지 정도만 주의해주면 되겠다.
그 후 전달받은 writer를 생성자 주입을 통해 객체주입을 진행하며, classify를 오버라이드하여 선별기준에 따른 writer 선별로직을 구성해주도록 한다.
./gradlew batch:composite:bootRun --args='--spring.batch.job.name=systemLogProcessingJob'
batch를 실행해보면

다음과 같이 정상적으로 객체내용에 따라 Classfied된 상태로 itemWriter가 실행되었음을 확인할 수 있다.
ListItemReader는 다른 itemReader와는 조금 다른 특이한 점이 있다. 다른 ItemReader는 "쿼리 실행을 통한 결과"를 객체로 역직렬화하여 반환받는 과정이라면, ListItemReader는 생성자 인자로 "데이터 객체 리스트 자체"를 전달받기 때문이다.
| Reader | 데이터 주입 방식 |
|---|---|
ListItemReader | 생성자에 List<T> |
JdbcPagingItemReader | DataSource + SQL |
JpaPagingItemReader | EntityManager + JPQL |
FlatFileItemReader | Resource (파일) |
KafkaItemReader | Consumer 설정 |
내부적으로 쿼리를 사용한다면,
List<SystemLog> logs = systemLogRepository.findAll();
return new ListItemReader<>(logs);
위와 같이 쿼리 결과를 추출해오고, ListItemReader의 생성자로 전달해줄 수는 있겠지만 일전의 다른 Reader의 빌더패턴에 비해 가독성이 떨어진다.
ListItemReader는 “외부에서 이미 준비된 컬렉션을 순차적으로 읽는 용도”이다. 쿼리 실행 / 외부 연동 기능은 전혀 없다.
따라서 ListItemReader는 테스트, 데모, 단위 Step 검증에 주로 사용하는 단순/일회성 용도로 사용을 많이 하며, 다른 외부환경 및 Reader 특성에 맞게, 즉 명확한 목적의 Reader를 구분하여 사용할 필요가 있겠다.
ClassifiedCompositeItemReader는 compositeItemReader에서 classify 기능이 추가된 인터페이스로, 이를 이해하면 자연스럽게 compositeItemReader를 구현할 수 있을 것이다.
이처럼 다양한 형태의 데이터 소스와, 이를 기록해야할 데이터 저장소가 여러개일때 이러한 복합Reader/Writer를 사용한다면 손쉬운 구성으로 일괄적인 처리를 기대할 수 있을 것이다.
더불어 test code에 쓰일 수 있는 단순일회성 Reader인 ListItemReader에 대해서도 알아보았는데, 웬만한 형태/종류의 Reader는 거의 모두 파악했기 때문에 실무에서 정확한 목적, 의도를 가진 Spring Batch Reader/Writer 설계를 할 수 있을 것으로 생각한다.