
[대량 데이터 저장하기 ①] JPA Batch Size
이전 글에서 JPA Batch Size 최적화를 통해 쿼리 수를 줄였지만, 여전히 한 가지 문제가 남아있다. 사용자가 등산을 마치고 기록 저장을 요청했을 때 대량의 실시간 데이터를 모두 저장할 때까지 기다려야 한다는 점이다.
K6 부하 테스트 결과 100명의 동시 사용자가 요청할 때 52%가 응답시간 2초를 초과하는 문제가 발생했다. 일부 요청은 9초를 초과했다. 물론 한번에 많은 사용자들이 몰리는 특성을 가진 API는 아니지만 4시간 넘게 등산을 하는 경우도 있다고 하니 보다 빨리 응답하기 위해 RabbitMQ를 활용한 비동기 처리를 도입하기로 했다.
프로세스를 간단히 정리하면 아래와 같다.
■ 기존의 동기 방식:
사용자 요청 → 비즈니스 로직 → 대량 데이터 저장 → 응답 반환
↑ 병목 지점
-------------------------------------------------------------
■ 개선된 비동기 방식:
사용자 요청 → 비즈니스 로직 → 즉시 응답 반환
↓
RabbitMQ Queue → Consumer → 대량 데이터 저장
(비동기 처리)
다시 말해 사용자에게 꼭 필요한 데이터만 즉시 반환하고 상세 트래킹 데이터는 백그라운드에서 처리하는 것이다.
RabbitMQ는 AMQP(Advanced Message Queuing Protocol) 기반의 메시지 브로커이다. 애플리케이션 간에 메시지를 안전하고 신뢰성 있게 전달하는 중간 매개체 역할을 한다.
Producer → Exchange → Queue → Consumer
흐름을 요약하면
이런 식인 것 같다.
@Configuration
public class RabbitMQConfig {
public static final String HIKING_RECORDS_QUEUE = "hiking-records-queue";
public static final String HIKING_RECORDS_DLQ = "hiking-records-dlq";
public static final String EXCHANGE = "hiking-records-exchange";
public static final String ROUTING_KEY = "hiking-records";
@Bean
public Queue hikingRecordsQueue() {
return QueueBuilder.durable(HIKING_RECORDS_QUEUE)
.withArgument("x-dead-letter-exchange", EXCHANGE)
.withArgument("x-dead-letter-routing-key", ROUTING_KEY + ".dlq")
.build();
}
@Bean
public Queue hikingRecordsDlq() {
return new Queue(HIKING_RECORDS_DLQ, true);
}
@Bean
public DirectExchange hikingRecordsExchange() {
return new DirectExchange(EXCHANGE);
}
// Binding 설정
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
}
durable(HIKING_RECORDS_QUEUE) : 큐를 영구적으로 보관한다. RabbitMQ 서버가 재시작되어도 큐가 사라지지 않는다.
x-dead-letter-exchange : 메시지 처리에 실패했을 때 메시지를 보낼 Dead Letter Exchange를 지정한다. 이를 통해 실패한 메시지를 별도로 처리할 수 있다.
x-dead-letter-routing-key : Dead Letter Exchange로 보낼 때 사용할 라우팅 키를 지정한다.
DirectExchange : 라우팅 키가 정확히 일치하는 큐로만 메시지를 전달하는 Exchange 타입이다. 1:1 매칭으로 단순하고 명확한 라우팅이 가능하다.
Jackson2JsonMessageConverter : 메시지를 JSON 형태로 직렬화/역직렬화한다. Java 객체를 JSON으로 변환하여 전송하고, 수신 시 다시 Java 객체로 변환한다.
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class HikingLiveRecordsDTO implements Serializable {
private static final long serialVersionUID = 1L;
private Integer userId;
private Integer mountainId;
private Integer pathId;
private Integer hikingHistoryId;
private int totalTime;
private double totalDistance;
private Double latitude;
private Double longitude;
private Integer heartRate;
}
기존 HikingLiveRecords 엔티티는 User, Mountain, Path, HikingHistory와 @ManyToOne 관계를 가지고 있다. 엔티티를 직접 직렬화할 경우 연관된 엔티티들이 함께 직렬화되어 메시지 크기가 커진다. 즉, 객체 그래프가 복잡해진다. 또한 양방향 연관관계로 인해 무한 루프가 발생할 수 있다.
따라서 필요한 ID 값들만 포함하는 단순한 DTO를 설계하여 직렬화 부하를 감소시키고 안정성을 확보했다.
@Service
@RequiredArgsConstructor
@Slf4j
public class TrackingService {
@Transactional
public TrackingFinishResponseDto manageTrackingFinish(Integer userId, TrackingFinishRequestDto request) {
log.info("트래킹 종료 API 호출 -> 요청 데이터 : {}", request);
// 필수 비즈니스 로직만 동기 처리
User user = userRepository.findById(userId).orElseThrow(NotFoundException::new);
Path path = pathRepository.findById(request.getPathId()).orElseThrow(NotFoundException::new);
Mountain mountain = mountainRepository.findById(request.getMountainId()).orElseThrow(NotFoundException::new);
// 정상 도착 여부 확인
validateSummitArrival(path, request);
// 기본 응답값 초기화
String badge = mountain.getMountainBadge();
Double avg = null;
Integer max = null;
Integer timeDiff = null;
if (request.isSave()) {
// 핵심 등산 기록만 즉시 저장
Footprint footprint = footprintRepository.findByUserAndMountain(user, mountain)
.orElseGet(() -> footprintRepository.save(Footprint.of(user, mountain)));
List<Integer> heartRates = request.getRecords().stream()
.map(BattleRecordsForTrackingResponseDto::getHeartRate)
.filter(Objects::nonNull)
.toList();
HikingHistory history = HikingHistory.of(footprint, path, request.getFinalTime(), heartRates);
hikingHistoryRepository.save(history);
avg = history.getAverageHeartRate();
max = history.getMaxHeartRate();
// 대량 데이터는 비동기로 처리
List<HikingLiveRecords> entityList = TrackingUtils.toEntities(
request.getRecords(), user, mountain, path, history);
// Entity → DTO 변환 후 RabbitMQ로 전송
List<HikingLiveRecordsDTO> dtoList = entityList.stream()
.map(entity -> HikingLiveRecordsDTO.builder()
.userId(user.getId())
.mountainId(mountain.getId())
.pathId(path.getId())
.hikingHistoryId(history.getId())
.totalTime(entity.getTotalTime())
.totalDistance(entity.getTotalDistance())
.latitude(entity.getLatitude())
.longitude(entity.getLongitude())
.heartRate(entity.getHeartRate())
.build())
.collect(Collectors.toList());
// 비동기 큐로 전송
rabbitTemplate.convertAndSend(RabbitMQConfig.HIKING_RECORDS_QUEUE, dtoList);
// 경험치 및 거리 갱신
userService.updateUserInfoAfterTracking(user, request.getFinalDistance(), mountain.getLevel());
}
// 기타 비즈니스 로직
// 즉시 응답 반환
return TrackingFinishResponseDto.of(badge, avg, max, timeDiff);
}
}
사용자에게 필요한 핵심 데이터(HikingHistory)만 즉시 저장하고 대량 데이터는 RabbitMQ로 전송한다. 큐 전송 직후 사용자에게 응답을 반환한다.
@Slf4j
@Service
@RequiredArgsConstructor
public class HikingRecordsConsumer {
private final HikingLiveRecordsRepository hikingLiveRecordsRepository;
private final UserRepository userRepository;
private final MountainRepository mountainRepository;
private final PathRepository pathRepository;
private final HikingHistoryRepository hikingHistoryRepository;
@RabbitListener(queues = "hiking-records-queue")
@Transactional
public void consumeHikingRecords(List<HikingLiveRecordsDTO> dtoList) {
log.info("Received {} HikingLiveRecordsDTO from queue", dtoList.size());
List<HikingLiveRecords> entityList = dtoList.stream()
.map(dto -> {
User user = userRepository.findById(dto.getUserId()).orElseThrow(NotFoundException::new);
Mountain mountain = mountainRepository.findById(dto.getMountainId()).orElseThrow(NotFoundException::new);
Path path = pathRepository.findById(dto.getPathId()).orElseThrow(NotFoundException::new);
HikingHistory history = hikingHistoryRepository.findById(dto.getHikingHistoryId()).orElseThrow(NotFoundException::new);
return HikingLiveRecords.builder()
.id(dto.getId())
.user(user)
.mountain(mountain)
.path(path)
.hikingHistory(history)
.totalTime(dto.getTotalTime())
.totalDistance(dto.getTotalDistance())
.latitude(dto.getLatitude())
.longitude(dto.getLongitude())
.heartRate(dto.getHeartRate())
.build();
})
.collect(Collectors.toList());
int batchSize = 120;
try {
for (int i = 0; i < entityList.size(); i += batchSize) {
List<HikingLiveRecords> batch = entityList.subList(i, Math.min(i + batchSize, entityList.size()));
hikingLiveRecordsRepository.saveAll(batch);
hikingLiveRecordsRepository.flush();
log.info("Saved batch of {} HikingLiveRecords", batch.size());
}
} catch (Exception e) {
log.error("Failed to save HikingLiveRecords: {}", e.getMessage());
throw e; // DLQ로 전송
}
}
}
@RabbitListener를 통해 큐에서 메시지를 자동으로 수신하고 처리할 수 있다. 이때 Spring AMQP가 제공하는 강력한 기능이 자동 역직렬화이다.
타입 추론 : 메서드 파라미터의 타입(List)을 분석하여 역직렬화할 타입을 자동으로 결정한다.
MessageConverter 활용 : 앞서 설정한 Jackson2JsonMessageConverter를 사용하여 JSON 메시지를 Java 객체로 변환한다.
컬렉션 처리 : 단일 객체뿐만 아니라 List, Set 등의 컬렉션 타입도 자동으로 처리한다. 메시지가 JSON 배열 형태라면 자동으로 List로 변환된다.
이러한 자동화된 처리 덕분에 메시지 파싱이나 변환 로직을 직접 작성하지 않고도 비즈니스 로직에 집중할 수 있다.
| 지표 | 동기 | 비동기 | 개선율 |
|---|---|---|---|
| 평균 응답 시간 | 2.6초 | 0.265초 | 90% 감소 |
| 최대 응답 시간 | 9.34초 | 1.52초 | 84% 감소 |
| 2초 이내 응답률 | 48% | 100% | 100% 달성 |
RabbitMQ를 통한 비동기 처리 도입으로 응답 시간이 대폭 개선되었다!