RabbitMQ / Pika 기본 개념

정환우·2025년 5월 23일
1

Python 공부

목록 보기
5/6

회사에서 쓸 일이 있어서, 겸사 겸사 개인 공부 정리

RabbitMQ

  • RabbitMQ 는 AMQP를 구현한 메시지 브로커

기본 구조는 아래와 같이 생김

RabbitMQ-Structure

1. Producer (생산자)

  • 메세지를 생산하여 Exchange 로 전송하는 주체
  • 어떤 Exchange 로 전송할 지, Routing Key등을 지정 가능

2. Exchange

  • 수신한 메세지를 어떤 Queue 로 보낼지 결정하는 역할

Exchange는 아래와 같이 4가지 타입이 존재한다.

Type

  1. Direct : 라우팅 키를 기준으로 정확히 일치하는 Queue 로 전달
  2. Fanout : 연결된 모든 Queue 로 메세지를 Broadcast, 즉 Queue 구분을 안한다고 생각하면 된다.
  3. Topic : Routing Key 의 패턴 매칭을 이용
  4. Headers : 헤더 기반으로 라우팅 결정

3. Routing Key

  • Exchange에서 Queue로 라우팅할 때 사용되는 구분자
  • 위에서 언급했듯이, 주로 DirectTopic 타입의 Exchange에서 사용.

4. Consumer (소비자)

  • Queue에서 메세지를 받아 처리하는 주체
  • 자동 or 수동으로 Message Ack 가능

Consumer가 Message를 소비하는 방식

  • rabbitmq에서는 Push Based, Pull Based 모두 가능
  • 일반적으로는 실시간 메세지 처리를 위해선 push based를 기본적으로 사용하고, 특정 상황에서만 pull based로 사용.
  • 방식과 상관없이 메세지를 제대로 처리한 경우 basic_ack()를 사용하여 메세지를 잘 처리했다는 확인 신호를 보냄.
    -auto_ack를 설정 시 소비자에게 메세지가 전달되는 즉시 자동으로 ACK처리하지만, 소비자가 메세지 처리 중 실패 하면 메세지 유실 가능성이 있음.
    • 수동으로 basic_ack()를 사용하고, 메세지 처리 실패시 basic.nack() 을 사용하여 메세지를 큐로 돌려보내거나, basic.reject()를 사용하여 메세지를 직접 폐기하는 것이 안전하다.
  • push based 는 basic_consume으로 콜백함수를 등록하면, queue에 메세지가 존재할 때 콜백 함수가 동작하는 방식
  • pull based 는 basic_get으로 메세지 하나를 직접 가져오는 방식

RabbitMQ Message Flow

그래서 전체 Flow는

  1. Producer가 메세지를 특정 Exchange에 전달
  2. Exchange에서는 바인딩된 Queue를 기반으로 메세지를 라우팅 (라우팅 과정은 Exchange Type에 따라 다르나, Bidning된 Queue만을 기반으로 처리하는 것은 공통적)
  3. Queue는 Message 저장. Consumer가 이를 받아서 처리함

이것들의 Flow를 그리면 다음과 같다.

  • ExchangeA에 routing_key=center 인 메세지를 보내면 QueueA에 저장됨
  • ExchangeB에 routing_key=error인 메세지를 보내면 QueueC에 저장됨
  • 일치하는 Exchangerouting key 가 없을 경우, 메세지는 증발함.

물론 이 전제조건은

  • Exchange Type 은 Direct 일 것. (그림에는 잘못 적었다.)
  • QueueA 는 ExchangeA에, QueueC는 ExchangeB에 바인딩되어야 할 것.

RabbitMQ-Flow

이 Flow를 Python 에서 pika 라이브러리를 이용한 코드로 나타내면 아래와 같다.

import pika

# RabbitMQ 서버 연결
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Exchange 선언
channel.exchange_declare(exchange='ExchangeA', exchange_type='direct')
channel.exchange_declare(exchange='ExchangeB', exchange_type='direct')

# 2. Queue 선언
channel.queue_declare(queue='QueueA')
channel.queue_declare(queue='QueueB')
channel.queue_declare(queue='QueueC')

# 3. 바인딩: 각 Queue를 Exchange에 라우팅 키와 함께 연결
channel.queue_bind(exchange='ExchangeA', queue='QueueA', routing_key='center')
channel.queue_bind(exchange='ExchangeB', queue='QueueB', routing_key='error')

# 메세지 보내는 producer 예시
channel.basic_publish(
    exchange='ExchangeA',
    routing_key='center',
    body='Heung-Min-Son wins Europa'
)

channel.basic_publish(
    exchange='ExchangeB',
    routing_key='error',
    body='System error'
)

connection.close()

이런 식으로 연결 되고 진행되는 것이다.

왜 Exchange / Routing Key와 같은 구조를 사용하는 가?

여러 이유가 있을텐데, 대표적인 이유를 몇 가지 들어보자면

  • Consumer를 변경하지 않고도 Queue 나 Routing Logic 교체 가능
  • 시스템이 커지더라도 Exchange 나 Routing Key Logic 조정만 하면 된다.
  • 하나의 Message를 여러 Consumer 가 동시에 다르게 처리 가능 (Exchange에서 Routing Key가 일치하는 애들에게 모두 메세지를 전송하기 때문이다.)
  • 메세지 흐름을 Queue 단위로 병렬 처리하거나, 부하 분산 설정이 쉽다.

마지막 병렬 처리에 대한 내용은 아직 제대로 이해 못하였으나, 나머지는 납득이 가능하다.

쉽게 말하면 Producer - Queue - Consumer 간 결합도를 떨어트려서, 유연함과 확장성을 확보한 것이다.

Producer <-> Consumer는 직접 연결되어 있지 않으니 서로 몰라도 되고 등등, 장점이 많은 구조인 것.

Channel 과 Connection

위 코드를 보다보면 connection, channel과 같은 설명되지 않은 변수가 있어서 좀 당황스럽다.

  • Connection : Client(예를 들면 pika나 java client 등)와 RabbitMQ 서버 간의 TCP 연결.
  • Channel : connection 에 존재하는 논리적 메세징 세션. 메세지 송수신 처리 단위

Connection 하나 안에 여러 Channel이 존재하는 개념.

그렇다면 queue, exchange는 channel별로 존재하는 가?

  • Queue, Exchange는 특정 채널, 커넥션에 종속된 개념이 아니라 broker 전체에 공유되는 Global Resource이다.
  • ConnectionChannel은 클라이언트 별로 존재하는 개념인데, QueueExchange정보는 RabbitMQ Server에 저장되는 내용이기 때문에, 클라이언트와 상관없이 공유되는 개념인 것
  • 단, 동시에 여러 채널에서 리소스를 선언할 때는 중복 선언 오류를 피하기 위해 passive=True옵션을 사용하기도 함.
    - 이 옵션을 사용하면 Queue 가 존재할 경우 에러가 안나고, 존재하지 않을 경우 에러가 발생함.

그래서 아래처럼

# Channel 1
channel1.exchange_declare(exchange='logs', exchange_type='direct')
channel1.queue_declare(queue='error_logs')
channel1.queue_bind(exchange='logs', queue='error_logs', routing_key='error')

# Channel 2
channel2.basic_publish(exchange='logs', routing_key='error', body='Something broke!')

channel2에서 queue_declare, queue_bind를 안해도 publish할 수 있음.

0개의 댓글