프로젝트에서 RabbitMQ 도입 및 운영

wonjun_choi·2025년 6월 1일

졸업작품

목록 보기
5/6
post-thumbnail

1. 글 개요

이 글에서는 Nebula 프로젝트에서 RabbitMQ를 도입하고 실제 운영하면서 겪었던 문제와 해결 과정을 공유한다.
기술 선택 배경 → 문제 상황 → 해결 방법 → 향후 계획 순으로 논리적으로 정리했다.


2. 기술 선택 배경

Nebula 서비스는 사용자 요청(예: 채팅 요청, AI 모델 호출 등)을 동기적으로 처리하면 응답 지연이 커질 수 있었다.

  • 처음에는 FastAPI(또는 Spring Boot)에서 바로 AI 모델을 호출했으나
    • 요청이 몰리면 서버 응답이 느려지고, 사용자 경험이 저하되었다.
    • 동시 요청 처리 시 자원 고갈 이슈가 발생했다.

이 문제를 해결하기 위해 RabbitMQ를 도입했다.

  • 비동기 메시지 큐를 활용해 요청을 큐에 쌓고 별도 워커가 처리하도록 분리할 수 있다.
  • 서비스 간 의존도를 낮춰 각 컴포넌트를 느슨하게 결합(loose coupling)할 수 있다.
  • 워커를 수평 확장하면 처리량(throughput)을 쉽게 늘릴 수 있다.

3. 문제 상황 및 요구 사항

3.1 동기 처리 문제

  • 사용자 채팅 요청을 받을 때마다 바로 AI 모델을 호출하면
    • 네트워크 지연과 모델 응답 속도 때문에 응답 시간이 느려진다.
    • 트래픽이 급증하면 서버 과부하로 서비스 장애 위험이 커진다.

3.2 서비스 간 결합도

  • 기존 구조는 FastAPI ←→ AI 모델 호출 ←→ 결과 반환 구조였다.
    • 각 컴포넌트가 서로 밀접하게 연결되어 있어 유연한 확장이 불가능했다.
    • 배포나 테스트 환경 분리 시 복잡도가 증가했다.

3.3 내구성 및 장애 복구

  • 운영 환경에서는 메시지 손실을 최소화해야 한다.
    • 서버 재시작이나 네트워크 장애 시에도 메시지를 보존해야 한다.
    • 워커가 예기치 않게 죽어도 메시지를 재처리할 수 있어야 한다.

4. RabbitMQ 아키텍처 구성

RabbitMQ는 크게 Exchange → Queue → Binding → Consumer 구조로 이루어져 있다.

4.1 Exchange·Queue·Binding 구성

  1. Exchange 선언

    • 이름: chat.req
    • 타입: DIRECT
    • 옵션:
      • durable=True → 재시작해도 Exchange 정보 유지
    exchange = await channel.declare_exchange(
        "chat.req",
        ExchangeType.DIRECT,
        durable=True
    )
    		```
    
  2. Queue 선언

    • 이름: nebula.chat.request

    • 옵션:

      • durable=True → 재시작해도 Queue 정보 유지
      • exclusive=False → 여러 컨슈머가 사용할 수 있음
      • auto_delete=False → 마지막 컨슈머 종료 시에도 자동 삭제되지 않음
    queue = await channel.declare_queue(
        "nebula.chat.request",
        durable=True,
        exclusive=False,
        auto_delete=False
    )
  3. Binding 설정

    • 바인딩 키(Binding Key): 사용자 ID(user_id)
    • DIRECT 타입은 정확히 일치하는 키만 라우팅하므로 실제 ID를 사용
    # 예: user_id가 "user123"인 경우
    await queue.bind(exchange, routing_key="user123")

5. 퍼블리셔(Publisher) 흐름

FastAPI에서 채팅 요청을 받아 RabbitMQ에 메시지를 퍼블리싱하는 코드를 살펴본다.

import asyncio
import uuid
import json
from datetime import datetime, timezone
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType

async def publish_chat_request(prompt_data, user_id):
    # 1) RabbitMQ 연결 (robust=True → 자동 재연결 기능 사용)
    connection = await connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    # 워커가 과도하게 메시지를 가져가지 않도록 설정 (prefetch_count=1)
    await channel.set_qos(prefetch_count=1)

    # 2) DIRECT 타입 Exchange 선언
    exchange = await channel.declare_exchange(
        "chat.req",
        ExchangeType.DIRECT,
        durable=True
    )

    # 3) 메시지 정보 설정
    job_id = str(uuid.uuid4())  # 고유 작업 ID
    body_bytes = json.dumps(prompt_data).encode()
    message = Message(
        body=body_bytes,
        delivery_mode=DeliveryMode.PERSISTENT,  # 메시지를 디스크에 저장
        correlation_id=job_id,                    # 컨슈머에서 결과 매핑용
        message_id=job_id,
        timestamp=datetime.now(timezone.utc),
        content_type="application/json"
    )

    # 4) 사용자 ID를 routing_key로 설정하고 메시지 퍼블리싱
    await exchange.publish(message, routing_key=str(user_id))
    print(f"Published job_id={job_id} to routing_key={user_id}")

    # 5) 연결 종료
    await connection.close()
  • connect_robust를 사용하면 네트워크 장애 발생 시 자동으로 재연결을 시도할 수 있다.
  • delivery_mode=DeliveryMode.PERSISTENT로 설정해, RabbitMQ가 메시지를 디스크에 기록하도록 한다.
  • prefetch_count=1 설정으로 워커가 한 번에 하나씩 메시지를 처리하도록 제한해, 컨슈머 과부하를 방지한다.

6. 컨슈머(Consumer) 흐름

컨슈머는 워커 프로세스로 독립 실행되며, Queue에 쌓인 메시지를 가져와 실제 작업을 수행한다.

import asyncio
import json
from aio_pika import connect_robust, ExchangeType, IncomingMessage

async def on_message(message: IncomingMessage):
    async with message.process():
        # 1) 메시지 바디 디코딩 및 파싱
        payload = json.loads(message.body.decode())
        job_id = message.correlation_id
        user_id = message.routing_key
        print(f"Received job_id={job_id}, user_id={user_id}, payload={payload}")

        # 2) 실제 작업 수행 (예: AI 모델 호출, 임베딩 생성 등)
        # result = await process_chat(payload)

        # 3) 작업 결과를 다른 Queue나 DB에 저장하거나, WebSocket을 통해 클라이언트에 전달
        # await publish_response(result, job_id, user_id)

async def consume_chat_requests(user_id):
    # 1) RabbitMQ 연결 및 채널 설정
    connection = await connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    # 2) Exchange 선언 및 Queue 선언/바인딩
    exchange = await channel.declare_exchange(
        "chat.req",
        ExchangeType.DIRECT,
        durable=True
    )
    queue = await channel.declare_queue(
        "nebula.chat.request",
        durable=True
    )
    await queue.bind(exchange, routing_key=str(user_id))

    # 3) 메시지 소비 시작 (no_ack=False → Ack/Nack 관리)
    await queue.consume(on_message, no_ack=False)
    print(f"[*] Waiting for messages for user_id={user_id}")

    # 4) 무한 대기 상태 유지
    await asyncio.Future()

# 워커 실행 예시
if __name__ == "__main__":
    asyncio.run(consume_chat_requests(user_id="user123"))
  • message.process() 컨텍스트 매니저 사용 시, 예외가 없으면 자동으로 ACK를 보내고, 예외 발생 시 자동으로 NACK 처리한다.
  • 처리된 메시지는 ACK가 전송된 후 삭제되며, ACK를 보내지 못하면 다른 컨슈머가 메시지를 재처리할 수 있다.

7. 주요 설정 및 내구성 확보

  1. durable 옵션

    • Exchange와 Queue 모두 durable=True로 설정해, RabbitMQ 서버가 재시작될 때도 설정 정보를 유지한다.
  2. 메시지 내구성(Persistent)

    • 퍼블리셔에서 delivery_mode=DeliveryMode.PERSISTENT를 지정해 메시지를 디스크에 기록하도록 한다.
    • 이 설정이 없으면 서버 재시작 시 메시지가 손실될 수 있다.
  3. QOS 설정

    • channel.set_qos(prefetch_count=1) 설정으로, 워커가 동시에 처리할 메시지 수를 1개로 제한해 과부하를 방지한다.
    • 워커가 한 번에 너무 많은 메시지를 가져가면 처리 지연과 예외가 발생할 가능성이 높아진다.
  4. Ack/Nack 처리

    • no_ack=False 기본 설정으로, 워커가 메시지를 처리한 후 명시적으로 ACK를 보내도록 한다.
    • 예외가 발생하면 자동으로 NACK되어 메시지가 재처리 대상이 된다.

8. 개발 과정에서 겪은 문제와 해결 경험

  1. Exchange에 메시지는 들어오나 Queue로 라우팅되지 않는 현상

    • 원인

      • DIRECT 타입 Exchange에서는 정확한 routing_key만 매칭된다.
      • 와일드카드(#)를 바인딩 키로 사용했으나, DIRECT 타입에서는 동작하지 않았다.
    • 해결

      1. Queue 바인딩 시 와일드카드 대신 실제 user_idrouting_key로 사용하도록 수정

        # 잘못된 예 (DIRECT + '#')
        # await queue.bind(exchange, routing_key="#")
        
        # 수정된 예
        await queue.bind(exchange, routing_key="user123")
      2. 또는, Exchange 타입을 TOPIC으로 변경하고 와일드카드를 사용하도록 전환

        exchange = await channel.declare_exchange(
            "chat.req",
            ExchangeType.TOPIC,
            durable=True
        )
        await queue.bind(exchange, routing_key="chat.*")
        await exchange.publish(message, routing_key=f"chat.{user_id}")
      • 최종적으로는 DIRECT + 정확한 ID 바인딩 방식으로 문제를 해결했다.
  2. 워커 과부하로 인한 메시지 누락/중복 처리

    • 원인

      • 워커가 동시에 너무 많은 메시지를 prefetch하면서 일부 메시지가 제때 처리되지 못했다.
    • 해결

      • prefetch_count=1로 설정해 워커가 한 번에 하나의 메시지만 처리하도록 제한했다.
      • 메시지 처리 시간을 모니터링하고, 필요 시 워커 수를 늘려 수평 확장(스케일 아웃)을 적용했다.
  3. 컨슈머 연결 실패 시 예외 처리

    • 원인

      • 네트워크 지연이나 RabbitMQ 서버 재시작 시 워커 프로세스가 종료되었다.
    • 해결

      • connect_robust 옵션을 사용해 자동 재연결을 활성화했다.
      • 워커 코드에 try/except 블록을 추가해 예외 발생 시 일정 시간 대기 후 재접속을 시도하도록 구현했다.

9. 향후 계획 및 고려사항

  1. 데드레터 큐(DLQ) 구성

    • 메시지 처리 실패 시 DLQ로 이동시켜 누락된 메시지를 별도 채널에서 재처리할 수 있도록 할 계획이다.
    • DLQ를 분석해 자주 발생하는 오류 패턴을 찾아 근본 원인을 개선할 수 있다.
  2. RabbitMQ 클러스터링 및 HA 모드 도입

    • 현재 단일 RabbitMQ 인스턴스만 사용 중이므로 장애 발생 시 서비스 중단 위험이 있다.
    • 클러스터링을 통해 가용성을 높이고, 로드 밸런싱을 이용해 안정적인 큐 운영을 검토 중이다.
  3. 컨슈머 오토스케일링 자동화

    • AWS ECS나 Kubernetes 환경에서 워커 인스턴스를 메시지 큐 길이와 처리 지연 시간 지표를 기반으로 자동으로 늘이거나 줄이는 방안을 검토 중이다.
    • 적절한 스케일링 정책을 수립해 처리량 급증 상황에서도 안정적인 운영을 유지할 수 있도록 할 예정이다.
  4. 모니터링·알림 체계 구축

    • RabbitMQ 내장 관리 플러그인(Management Plugin)을 활용해 큐 상태, 소비 지연(latency) 등을 실시간 모니터링한다.
    • Prometheus와 Grafana를 연동해 시각화 대시보드를 구성하고, 이상 징후 발생 시 Slack이나 이메일로 알림이 오도록 설정할 계획이다.

0개의 댓글