스프링 배치는 다양한 내장 ItemReader와 ItemWriter를 제공해 일반적인 데이터 처리 작업을 손쉽게 수행할 수 있도록 지원한다.
하지만 특정 비즈니스 로직에 맞는 배치를 구현하기 위해서는 커스터마이징이 필요한데, 이처럼 이번에는 Custom ItemReader와 Custom ItemWriter를 구현하여 배치 작업에서 활용하는 방법에 대해 알아보자
QuerydslPagingItemReader
는 Spring Batch에서 공식적으로 제공되는 Reader는 아니지만, AbstractPagingItemReader
를 상속받아 Querydsl
을 활용해 데이터를 페이징하며 읽을 수 있도록 구현한 커스터마이즈 Reader이다.
public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {
private EntityManager em;
private final Function<JPAQueryFactory, JPAQuery<T>> querySupplier;
private final Boolean alwaysReadFromZero;
public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
Function<JPAQueryFactory, JPAQuery<T>> querySupplier,
int chunkSize) {
this(entityManagerFactory, querySupplier, chunkSize, false);
}
public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
Function<JPAQueryFactory, JPAQuery<T>> querySupplier,
int chunkSize,
Boolean alwaysReadFromZero) {
super.setPageSize(chunkSize);
this.em = entityManagerFactory.createEntityManager();
this.querySupplier = querySupplier;
this.alwaysReadFromZero = alwaysReadFromZero;
}
@Override
protected void doReadPage() {
initQueryResult();
JPAQueryFactory jpaQueryFactory = new JPAQueryFactory(em);
long offset = alwaysReadFromZero ? 0 : (long) getPage() * getPageSize();
JPAQuery<T> query = querySupplier.apply(jpaQueryFactory)
.offset(offset)
.limit(getPageSize());
List<T> queryResult = query.fetch();
for (T entity : queryResult) {
em.detach(entity);
results.add(entity);
}
}
@Override
protected void doClose() {
if (em != null) em.close();
super.doClose();
}
private void initQueryResult() {
if (results == null) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
}
}
doReadPage():
Querydsl의 JPAQueryFactory를 통해 동적으로 생성된 쿼리를 실행하며 데이터를 페이징 처리한다.
=> offset과 limit을 계산하고, 이를 Querydsl의 쿼리에 적용한다.
조회된 결과는 results 리스트에 저장한다.
doClose():
사용이 끝난 EntityManager를 닫아 리소스를 해제한다.
initQueryResult():
매 페이지마다 새로운 데이터를 저장하기 위해 results를 초기화한다.
코드의 가독성을 높이기 위해 빌더 패턴을 사용하여 QuerydslPagingItemReader 객체를 생성할 수도 있다.
@Bean
public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() {
return new QuerydslPagingItemReaderBuilder<Customer>()
.name("customerQuerydslPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.chunkSize(10)
.querySupplier(jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer)
.from(QCustomer.customer)
.where(QCustomer.customer.age.gt(20)))
.build();
}
CustomItemWriter는 Spring Batch에서 기본 제공되는 ItemWriter 인터페이스를 구현하여 특정 비즈니스 요구 사항에 맞는 데이터 처리를 수행하도록 커스터마이징한다. 기본 Writer로 처리할 수 없는 고유한 작업을 구현할 때 활용한다.
구성 요소
장점
유연성: 다양한 비즈니스 로직에 맞게 커스터마이징할 수 있다.
확장성: 데이터 처리 과정을 확장하거나 변경할 수 있다.
제어 가능성: 데이터 처리 흐름을 세부적으로 제어할 수 있다.
단점
개발 복잡성: 구현 과정이 다소 복잡할 수 있다.
테스트 어려움: 로직이 복잡해질수록 테스트 작성이 어려워질 수 있다.
디버깅 어려움: 문제 발생 시 디버깅이 더 어려울 수 있다.
@Slf4j
@Service
public class CustomService {
public Map<String, String> processToOtherService(Customer item) {
log.info("Call API to OtherService....");
return Map.of("code", "200", "message", "OK");
}
}
CustomService는 단순히 로그를 출력하고 응답을 반환하도록 작성한다.
CustomItemWriter 작성하기
ItemWriter 인터페이스를 구현하여 데이터를 처리한다.
@Slf4j
@Component
public class CustomItemWriter implements ItemWriter<Customer> {
private final CustomService customService;
public CustomItemWriter(CustomService customService) {
this.customService = customService;
}
@Override
public void write(Chunk<? extends Customer> chunk) throws Exception {
for (Customer customer : chunk) {
log.info("Processing in CustomItemWriter...");
customService.processToOtherService(customer);
}
}
}
Step 설정
@Bean
public Step customWriterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("customWriterStep", jobRepository)
.<Customer, Customer>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(customItemWriter)
.build();
}
Job 설정
@Bean
public Job customWriterJob(Step customWriterStep, JobRepository jobRepository) {
return new JobBuilder("customWriterJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(customWriterStep)
.build();
}
Spring Batch는 여러 Step을 정의하고, 조건에 따라 실행 순서를 조정하거나 특정 Step을 건너뛰는 등의 Flow 컨트롤 기능을 제공한다. 이 기능은 FlowBuilder API를 통해 설정된다.
1. next
next를 사용하면 여러 Step을 순차적으로 연결할 수 있다.
기본적으로 한 Step이 완료되면 다음 Step으로 이동한다.@Bean public Job job() { return jobBuilderFactory.get("job") .start(step1()) .next(step2()) .end() .build(); }
위 예제는 step1이 완료된 후 step2로 넘어가는 간단한 Flow를 보여준다.
2. on
on은 특정 Step의 종료 조건(ExitStatus)에 따라 다음 Step으로 이동할지 결정한다.
조건에 따라 다른 Step으로 분기할 수 있다.@Bean public Job job() { return jobBuilderFactory.get("job") .start(step1()) .on("FAILED").to(step3()) .from(step1()).on("COMPLETED").to(step2()) .end() .build(); }
step1의 결과가 FAILED면 step3으로 이동하고, COMPLETED면 step2로 이동한다.
조건에 따라 서로 다른 Flow를 설정할 수 있다.
3. stop
stop은 특정 Step의 종료 상태를 확인하여 배치 작업을 중단한다.@Bean public Job job() { return jobBuilderFactory.get("job") .start(step1()) .on("FAILED").stop() .end() .build(); }
step1의 결과가 FAILED면 배치 작업을 중단한다.
특정 조건에서 작업을 멈추고 에러를 방지하거나 재작업을 유도할 수 있다.
next를 활용해 순차적인 작업을 수행할 수 있다.
on과 from을 통해 조건에 따라 분기 처리할 수 있다.
stop으로 특정 조건에서 배치 작업을 종료하여 안정성을 높일 수 있다.
참고글 - [SpringBatch 연재 10] 스프링배치 플로우 컨트롤 하기
깃허브 - https://github.com/hysong4u/springbatch
진심으로 10회 동안 너무 성실하게 꿋꿋히 참여해줘서 너무 고마워요 ㅠㅠ
하연쓰 님에 대해서도 이야기 많이 했어야하는데 못해서 너무 아쉬워요 ㅠㅠ
그럴 기회도 생기면 좋을 거 같아요 ㅠㅠ
진심으로 고생했고 ㅠㅠ 연말에는 숨 한번 크게 돌리면서 건강도 챙기면서 활동하길 바라고요 ㅠㅠ
꼭 이야기 할 기회있으면 꼭 만들어 봐요!!@#@!#