
이 글에서는 Nebula 프로젝트에서 RabbitMQ를 도입하고 실제 운영하면서 겪었던 문제와 해결 과정을 공유한다.
기술 선택 배경 → 문제 상황 → 해결 방법 → 향후 계획 순으로 논리적으로 정리했다.
Nebula 서비스는 사용자 요청(예: 채팅 요청, AI 모델 호출 등)을 동기적으로 처리하면 응답 지연이 커질 수 있었다.
이 문제를 해결하기 위해 RabbitMQ를 도입했다.
RabbitMQ는 크게 Exchange → Queue → Binding → Consumer 구조로 이루어져 있다.
Exchange 선언
chat.req DIRECT durable=True → 재시작해도 Exchange 정보 유지 exchange = await channel.declare_exchange(
"chat.req",
ExchangeType.DIRECT,
durable=True
)
```
Queue 선언
이름: nebula.chat.request
옵션:
durable=True → 재시작해도 Queue 정보 유지exclusive=False → 여러 컨슈머가 사용할 수 있음auto_delete=False → 마지막 컨슈머 종료 시에도 자동 삭제되지 않음queue = await channel.declare_queue(
"nebula.chat.request",
durable=True,
exclusive=False,
auto_delete=False
)
Binding 설정
user_id)# 예: user_id가 "user123"인 경우
await queue.bind(exchange, routing_key="user123")
FastAPI에서 채팅 요청을 받아 RabbitMQ에 메시지를 퍼블리싱하는 코드를 살펴본다.
import asyncio
import uuid
import json
from datetime import datetime, timezone
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeType
async def publish_chat_request(prompt_data, user_id):
# 1) RabbitMQ 연결 (robust=True → 자동 재연결 기능 사용)
connection = await connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
# 워커가 과도하게 메시지를 가져가지 않도록 설정 (prefetch_count=1)
await channel.set_qos(prefetch_count=1)
# 2) DIRECT 타입 Exchange 선언
exchange = await channel.declare_exchange(
"chat.req",
ExchangeType.DIRECT,
durable=True
)
# 3) 메시지 정보 설정
job_id = str(uuid.uuid4()) # 고유 작업 ID
body_bytes = json.dumps(prompt_data).encode()
message = Message(
body=body_bytes,
delivery_mode=DeliveryMode.PERSISTENT, # 메시지를 디스크에 저장
correlation_id=job_id, # 컨슈머에서 결과 매핑용
message_id=job_id,
timestamp=datetime.now(timezone.utc),
content_type="application/json"
)
# 4) 사용자 ID를 routing_key로 설정하고 메시지 퍼블리싱
await exchange.publish(message, routing_key=str(user_id))
print(f"Published job_id={job_id} to routing_key={user_id}")
# 5) 연결 종료
await connection.close()
connect_robust를 사용하면 네트워크 장애 발생 시 자동으로 재연결을 시도할 수 있다.delivery_mode=DeliveryMode.PERSISTENT로 설정해, RabbitMQ가 메시지를 디스크에 기록하도록 한다.prefetch_count=1 설정으로 워커가 한 번에 하나씩 메시지를 처리하도록 제한해, 컨슈머 과부하를 방지한다.컨슈머는 워커 프로세스로 독립 실행되며, Queue에 쌓인 메시지를 가져와 실제 작업을 수행한다.
import asyncio
import json
from aio_pika import connect_robust, ExchangeType, IncomingMessage
async def on_message(message: IncomingMessage):
async with message.process():
# 1) 메시지 바디 디코딩 및 파싱
payload = json.loads(message.body.decode())
job_id = message.correlation_id
user_id = message.routing_key
print(f"Received job_id={job_id}, user_id={user_id}, payload={payload}")
# 2) 실제 작업 수행 (예: AI 모델 호출, 임베딩 생성 등)
# result = await process_chat(payload)
# 3) 작업 결과를 다른 Queue나 DB에 저장하거나, WebSocket을 통해 클라이언트에 전달
# await publish_response(result, job_id, user_id)
async def consume_chat_requests(user_id):
# 1) RabbitMQ 연결 및 채널 설정
connection = await connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
# 2) Exchange 선언 및 Queue 선언/바인딩
exchange = await channel.declare_exchange(
"chat.req",
ExchangeType.DIRECT,
durable=True
)
queue = await channel.declare_queue(
"nebula.chat.request",
durable=True
)
await queue.bind(exchange, routing_key=str(user_id))
# 3) 메시지 소비 시작 (no_ack=False → Ack/Nack 관리)
await queue.consume(on_message, no_ack=False)
print(f"[*] Waiting for messages for user_id={user_id}")
# 4) 무한 대기 상태 유지
await asyncio.Future()
# 워커 실행 예시
if __name__ == "__main__":
asyncio.run(consume_chat_requests(user_id="user123"))
message.process() 컨텍스트 매니저 사용 시, 예외가 없으면 자동으로 ACK를 보내고, 예외 발생 시 자동으로 NACK 처리한다.durable 옵션
durable=True로 설정해, RabbitMQ 서버가 재시작될 때도 설정 정보를 유지한다.메시지 내구성(Persistent)
delivery_mode=DeliveryMode.PERSISTENT를 지정해 메시지를 디스크에 기록하도록 한다.QOS 설정
channel.set_qos(prefetch_count=1) 설정으로, 워커가 동시에 처리할 메시지 수를 1개로 제한해 과부하를 방지한다.Ack/Nack 처리
no_ack=False 기본 설정으로, 워커가 메시지를 처리한 후 명시적으로 ACK를 보내도록 한다.Exchange에 메시지는 들어오나 Queue로 라우팅되지 않는 현상
원인
DIRECT 타입 Exchange에서는 정확한 routing_key만 매칭된다.#)를 바인딩 키로 사용했으나, DIRECT 타입에서는 동작하지 않았다.해결
Queue 바인딩 시 와일드카드 대신 실제 user_id를 routing_key로 사용하도록 수정
# 잘못된 예 (DIRECT + '#')
# await queue.bind(exchange, routing_key="#")
# 수정된 예
await queue.bind(exchange, routing_key="user123")
또는, Exchange 타입을 TOPIC으로 변경하고 와일드카드를 사용하도록 전환
exchange = await channel.declare_exchange(
"chat.req",
ExchangeType.TOPIC,
durable=True
)
await queue.bind(exchange, routing_key="chat.*")
await exchange.publish(message, routing_key=f"chat.{user_id}")
워커 과부하로 인한 메시지 누락/중복 처리
원인
해결
prefetch_count=1로 설정해 워커가 한 번에 하나의 메시지만 처리하도록 제한했다.컨슈머 연결 실패 시 예외 처리
원인
해결
connect_robust 옵션을 사용해 자동 재연결을 활성화했다.try/except 블록을 추가해 예외 발생 시 일정 시간 대기 후 재접속을 시도하도록 구현했다.데드레터 큐(DLQ) 구성
RabbitMQ 클러스터링 및 HA 모드 도입
컨슈머 오토스케일링 자동화
모니터링·알림 체계 구축