이번 게시글에서는 웹 개발자 성향 테스트 프로젝트에서 사용자들의 검사 결과를 통계 정보로 변환해주는 Batch Job에서 MongoDB Aggregation을 이용하여 성능을 향상시킨 내용을 기술하려고 한다.
기존의 Job은 친숙한 구조인 reader, processor, writer로 구성되어 있었다.
@Configuration
public class ReaderConfig {
public static final String TEST_RESULT_READER = "testResultReader";
@Bean(name = TEST_RESULT_READER)
public MongoPagingItemReader<TestResult> testResultReader(MongoTemplate mongoTemplate) {
MongoPagingItemReader<TestResult> reader = new MongoPagingItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setPageSize(10);
reader.setQuery(new Query());
reader.setSort(Collections.singletonMap("createdAt", Sort.Direction.ASC));
reader.setTargetType(TestResult.class);
return reader;
}
}
MongoDB에서 검사 결과를 페이징하여 읽어오는 reader 설정
@Configuration
@RequiredArgsConstructor
public class ProcessorConfig {
public static final String STATISTIC_PROCESSOR = "statisticProcessor";
@Bean(name = STATISTIC_PROCESSOR)
public ItemProcessor<TestResult, Statistic> statisticProcessor(WebDeveloperProfileRepository webDeveloperProfileRepository) {
return item -> {
WebDeveloperProfile profile = webDeveloperProfileRepository.findByMbtiType(item.getMbtiType())
.orElseThrow(() -> new IllegalArgumentException("No profile found for type: " + item.getMbtiType()));
return Statistic.builder()
.developerProfile(profile)
.count(1L)
.matchCount(item.isMatch() ? 1L : 0L)
.build();
};
}
}
검사 결과를 통계 정보로 변환하는 processor 설정
@Configuration
public class WriterConfig {
public static final String STATISTIC_WRITER = "statisticWriter";
@Bean(name = STATISTIC_WRITER)
public ItemWriter<Statistic> statisticWriter(StatisticRepository statisticRepository) {
return items -> {
for (Statistic item : items) {
Statistic existing = statisticRepository.findByDeveloperProfile(item.getDeveloperProfile())
.orElse(new Statistic(item.getDeveloperProfile(), 0L, 0L));
existing.updateCount(existing.getCount() + item.getCount());
existing.updateMatchCount(existing.getMatchCount() + item.getMatchCount());
statisticRepository.save(existing);
}
};
}
}
통계 정보를 저장하는 writer 설정
[if kakao 2022] Batch Performance를 고려한 최선의 Reader에서는 배치에서 데이터를 읽어오는 것이 배치의 전체 성능에서 큰 영향을 미친다고 한다. 지금 구조는 Reader에서 검사 결과를 하나씩 불러와 Processor에서 통계 정보로 변환한 뒤 Writer에서 저장한다. 이러한 구조의 문제점은 성능이 느리고 데이터가 증가할수록 소요 시간이 선형적으로 증가한다는 것이다.
만약 동일하게 절단해야 하는 여러 장의 종이를 절단하는 작업을 한다고 할 때, 하나씩 절단할 것인가? 아니다. 여러 장의 종이를 한 번에 절단하고자 노력할 것이다. 여러 장의 종이를 한 번에 절단하면 종이의 양에 따라 소요시간이 증가하지 않을 것이다.
이러한 생각을 기반으로 문제를 해결하기 위해 MongoDB의 Aggregation Operations을 사용하기로 했다.
간단한 작업이기 때문에 reader, proccesor, witer 구조가 오버엔지니어링이라고 판단하여 개선된 Job은 기존에 친숙했던 reader, proccesor, witer에서 완전히 벗어나 하나의 tasklet으로 구성하였다.
@Component
public class StatisticTasklet implements Tasklet {
private final MongoTemplate mongoTemplate;
private final StatisticRepository statisticRepository;
private final WebDeveloperProfileRepository profileRepository;
public StatisticTasklet(MongoTemplate mongoTemplate, StatisticRepository statisticRepository, org.meotppo.webti.domain.repository.jpa.developertype.WebDeveloperProfileRepository profileRepository) {
this.mongoTemplate = mongoTemplate;
this.statisticRepository = statisticRepository;
this.profileRepository = profileRepository;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Aggregation aggregation = newAggregation(
group("mbtiType")
.count().as("count")
.sum(ConditionalOperators.when(Criteria.where("match").is(true)).then(1).otherwise(0)).as("matchCount")
);
AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, "test_result", Document.class);
List<Document> documents = results.getMappedResults();
documents.forEach(this::processDocument);
return RepeatStatus.FINISHED;
}
private void processDocument(Document result) {
MbtiType mbtiType = getMbtiType(result, "_id").orElseThrow(() -> new IllegalArgumentException("MBTI type is missing or invalid"));
Long count = getLongValue(result, "count").orElseThrow(() -> new IllegalArgumentException("Count is missing"));
Long matchCount = getLongValue(result, "matchCount").orElseThrow(() -> new IllegalArgumentException("Match count is missing"));
WebDeveloperProfile profile = profileRepository.findByMbtiType(mbtiType).orElseThrow(() -> new IllegalArgumentException("No profile found for type: " + mbtiType));
Statistic existing = statisticRepository.findByDeveloperProfile(profile).orElseGet(() -> new Statistic(profile, 0L, 0L));
existing.updateCount(existing.getCount() + count);
existing.updateMatchCount(existing.getMatchCount() + matchCount);
}
private Optional<MbtiType> getMbtiType(Document document, String key) {
return Optional.ofNullable(document.getString(key))
.map(mbtiTypeString -> {
try {
return MbtiType.valueOf(mbtiTypeString);
} catch (IllegalArgumentException e) {
return null;
}
});
}
private Optional<Long> getLongValue(Document document, String key) {
return Optional.ofNullable(document.get(key, Number.class)).map(Number::longValue);
}
}
MongoDB Aggregation을 사용하여 통계 정보를 한 번에 처리하는 tasklet 설정
기존 Job에 비해서 개선한 Job의 구조가 훨씬 효율적이기 때문에 상당한 성능 향상을 예측하였고 실제로 확인하기 위해서 성능을 측정해보았다.
성능이 크게 개선됨은 분명하고 더 나아가서 데이터의 양이 증가해도 처리 시간이 거의 일정하게 유지됨을 확인할 수 있다.
[공부정리] Jenkins를 이용한 Spring Batch 실행에서 언급했듯이, 매시 정각에 Batch가 실행되기 때문에 이미 수행했던 데이터의 처리가 필요하다. 이를 위해 여러 가지 방법을 고민해보았다.
이 중에서 가장 구현이 쉽고 속도가 빠를 것으로 예상되는 데이터를 처리한 뒤 삭제하는 방식을 선택하였다. 아래는 해당 tasklet이다.
@Component
@RequiredArgsConstructor
public class CleanupTasklet implements Tasklet {
private final TestResultRepository testResultRepository;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
testResultRepository.deleteAll();
return RepeatStatus.FINISHED;
}
}
처리한 데이터를 삭제하는 cleanup tasklet 설정
데이터를 처리한 뒤 삭제하는 것은 일반적인 프로젝트에서는 위험성이 있어 쉽게 적용하기 어렵다. 그래서 일반적으로 사용되는 방법에 대해 궁금하다. 처리한 데이터를 다른 저장소로 이동시킬려나?
이러한 고민을 해보셨고, 해결하셨다면 공유해주시면 감사하겠습니다.