
배경
Palette(데이터 코스 추천 서비스)는 사용자(커플)에게 맞춤화된 추천 결과를 제공하기 위해 ML 서버와 통신하여 각 사용자의 임베딩 관련 처리를 한다.
임베딩 관련 처리가 필요한 기능은 다음과 같다.
1. 사용자 취향(선호 음식점) 선택 → 임베딩 생성
2. 음식점 추천 결과 제공 → 임베딩 연산
3. 리뷰 작성 → 임베딩 갱신여기서 1번과 3번의 경우, 유저의 임베딩을 새로 업데이트 해야하는 작업이므로 ML 서버에서 약 1~2초 정도의 처리 시간이 소요된다.
그렇기 때문에 API 서버에서의 블로킹 I/O로 인한 병목 현상이 나타날 것이라고 생각했다.
위는 Apache JMeter로 동시 사용자 수(Thread) 300명으로 부하테스트를 진행한 결과이다.
100명까지는 응답 지연만 발생하였으나, 300명부터는 API 서버의 JDBC 커넥션 풀 고갈로 인해 Error Rate가 0% → 22.7%로 증가했다.따라서 비동기 통신로 이를 해결해야겠다는 생각을 하게 되었다.
비동기 통신을 위해 @Async 어노테이션을 고려했으나, 이는 호출자 입장에서만 논블로킹이고 사실상 별도의 스레드 풀에서 블로킹 I/O로 실행된다. 따라서 여전히 스레드 풀 고갈 문제가 발생할 수 있다.
WebFlux의 subscribe()를 통한 비동기 처리는 무책임한 방식이라고 생각했다. 비동기 요청을 보내놓고, 추천 시스템의 임베딩 처리 실패나 시스템 장애에 대해 대응하기 쉽지 않다.
→ 결과적으로 시스템 간 결합도를 낮출 수 있고, 예외 처리나 메시지 재처리, 장애 대응에 유리한 메시지 큐를 도입하기로 결정했다.
그 중 RabbitMQ를 선택한 이유는 '이벤트 기반 단순 임베딩 계산이라는 목적'과 '낮은 러닝커브와 복잡성' 등이 있었다.

나는 두 개의 exchange를 운영하기로 결정했다.
embeddingStatus를 따로 사용하는 이유는 다음과 같다.
선호 음식점 선택 페이지는 위와 같은데,
1. 선택 완료 후 메시지 큐에 User Interaction(선택한 음식점)을 전송하고, member 엔티티의preferenceStatus를 'INCOMPLETE'에서 'PENDING'으로 수정한다.
2. ML 서버의 임베딩 생성이 성공하면 embeddingStatus exchange를 통해 성공 여부를 수신하고, 본격적으로 추천 기능을 사용할 수 있도록preferenceStatus를 COMPLETE로 수정한다.이러한 로직을 위해 embeddingStatus exchage를 따로 운영한다.
Mac 환경에서 homebrew를 통해 rabbitmq를 설치하여 실행하였다.
brew isntall rabbitmq
brew services start rabbitmq
RabbitMQ는 AMQP(메시지 브로커 간 통신을 표준화하기 위해 설계된 프로토콜)을 구현한 메시지 브로커이다.
이를 사용하기 위해 의존성 추가 및 RabbitMQ 연결을 위한 설정을 추가해준다.
build.gradle
implementation 'org.springframework.boot:spring-boot-starter-amqp'
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username : guest
password: guest
# ssl: # Amazon MQ는 기본적으로 SSL을 사용하므로 배포 시에는 이를 활성화해야한다.
# enabled: true
rabbitmq:
exchanges:
interaction: interaction.exchange
embeddingStatus: embeddingStatus.exchange
queues:
interaction: interaction.queue
embeddingStatus: embeddingStatus.queue
routing-keys:
interaction: interaction.key
embeddingStatus: embeddingStatus.key
윗 부분은 RabbitMQ 연결을 위한 설정을 하였고, 아랫 부분은 RabbitMQ 메시징 구성을 명시했다.
(2개의 exchange를 운영하기로 하였기 때문에 각 요소가 두 개씩 존재한다.)
RabbitMqConfig.java
@RequiredArgsConstructor
@Configuration
public class RabbitMqConfig {
// Member Interaction
@Value("${rabbitmq.exchanges.interaction}")
private String interactionExchange;
@Value("${rabbitmq.queues.interaction}")
private String interactionQueue;
@Value("${rabbitmq.routing-keys.interaction}")
private String interactionRoutingKey;=
/**
* 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
*/
@Bean
public DirectExchange interactionExchange() {
return new DirectExchange(interactionExchange);
}
/**
* 지정된 큐 이름으로 Queue Bean 을 생성
*/
@Bean
public Queue interactionQueue() {
return new Queue(interactionQueue);
}
/**
* 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
* Exchange 에 Queue 을 등록한다고 이해하자
**/
@Bean
public Binding interactionBinding() {
return BindingBuilder
.bind(interactionQueue())
.to(interactionExchange())
.with(interactionRoutingKey);
}
/**
* RabbitTemplate을 생성하여 반환
* ConnectionFactory 로 연결 후 실제 작업을 위한 Template
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
코드 길이가 길어져서 줄였는데, 추가적인 exchange가 있다면 각각의 exchange, queue, binding을 따로 빈에 등록해주자.
대부분의 블로그에서 RabbitMqProperties를 만들고, ConnectionFactory을 따로 생성해 빈으로 등록하는데, 따로 만들 필요 없이 Spring Boot Auto Configuration에 의해 자동으로 연결된다. (SSL 설정을 하던 중 알게 되었다)
필요한 기능에 따라 다음 코드와 같이 서비스를 구현하여 사용하면 된다.
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageListener {
private final MemberRepository memberRepository;
/**
* 1. Queue 에서 메세지를 구독
**/
@RabbitListener(queues = "${rabbitmq.queues.embeddingStatus}")
@Transactional
public void receiveMemberEmbeddingStatusMessage(MemberEmbeddingStatusMessage message) {
log.info("RabbitMQ 유저 임베딩 생성 여부 메시지 수신 - memberId: {}, isSuccess: {}", message.memberId(), message.isSuccess());
Member member = memberRepository.findById(message.memberId())
.orElseThrow(NotFoundMemberException::new);
if (message.isSuccess()) {
log.info("임베딩 생성 성공 - memberId: {}", message.memberId());
member.setPreferenceStatus(PreferenceStatus.COMPLETE);
} else {
log.warn("임베딩 생성 실패 - memberId: {}", message.memberId()); // ML 서버에서 처리 실패 시, 선호 레스토랑 선택 여부 값을 다시 INCOMPLETE로 설정
member.setPreferenceStatus(PreferenceStatus.INCOMPLETE);
}
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSender {
@Value("${rabbitmq.exchanges.interaction}")
private String interactionExchange;
@Value("${rabbitmq.routing-keys.interaction}")
private String interactionRoutingKey;
private final RabbitTemplate rabbitTemplate;
/**
* 1. Queue 로 메세지를 발행
* 2. Producer 역할 -> Direct Exchange 전략
**/
public void sendMemberInteractionMessage(MemberInteractionMessage message) {
rabbitTemplate.convertAndSend(interactionExchange, interactionRoutingKey, message);
log.info("RabbitMQ 유저 인터렉션 메시지 발행 성공 - interactionType: {}, memberId: {}, restaurantIds: {}",
message.interactionType(), message.memberId(), message.restaurantIdList());
}
}
예외 처리는 비즈니스 로직을 처리하는 서비스단에서 따로 해주었다.
(선호 음식점 선택에서의 메시지 발행 실패는 예외를 던지지만, 리뷰 작성에서는 리뷰 작성 자체가 실패될 필요는 없으므로 따로 예외를 던지지 않는다.)
배포 시에는 편의를 위해 Amazon MQ(완전관리형 오픈 소스 메시지 브로커 서비스)를 사용하기로 하였다.
AWS에서 셋업이 정말 쉽도록 구축해놓았기 때문에 주의해야할 것만 올리겠다.

인스턴스 별로 요금차이가 매우 크게 나서, 서비스 규모가 아직 작다면 mq.t3.micro를 선택하자. (프리티어도 적용된다)
ssl:
enabled: true
Amazon MQ는 기본적으로 SSL을 사용하므로 application.yml에 위 설정을 반드시 추가해야 연결 오류가 나지 않는다.
(만약 오류가 지속된다면, ConnectionFactory를 직접 빈에 등록해주는 코드를 복붙해왔기 때문일 수 있다. 없어도 Spring Boot Auto Configuration에 의해 설정 파일을 바탕으로 자동으로 빈이 생성된다.)
마찬가지로 JMeter를 활용한 부하 테스트 수행 결과이다.

결과적으로 추천 시스템의 임베딩 처리 시간인 약 1초의 블로킹 I/O를 해소하였다.
동시 사용자 수가 300명일 때, Error rate: 0%를 달성하여 대규모 트래픽에서의 서버 안정성을 개선하였다.
처리량(Throughput)은 약 40배 증가하였고, 클라이언트에서의 API 응답 시간 지연도 대폭 개선되었다.