회사에서 쓸 일이 있어서, 겸사 겸사 개인 공부 정리
기본 구조는 아래와 같이 생김
Exchange
로 전송하는 주체 Exchange
로 전송할 지, Routing Key
등을 지정 가능Exchange는 아래와 같이 4가지 타입이 존재한다.
Direct
: 라우팅 키를 기준으로 정확히 일치하는 Queue 로 전달Fanout
: 연결된 모든 Queue 로 메세지를 Broadcast, 즉 Queue 구분을 안한다고 생각하면 된다.Topic
: Routing Key
의 패턴 매칭을 이용Headers
: 헤더 기반으로 라우팅 결정Exchange
에서 Queue
로 라우팅할 때 사용되는 구분자Direct
나 Topic
타입의 Exchange에서 사용.basic_ack()
를 사용하여 메세지를 잘 처리했다는 확인 신호를 보냄.auto_ack
를 설정 시 소비자에게 메세지가 전달되는 즉시 자동으로 ACK처리하지만, 소비자가 메세지 처리 중 실패 하면 메세지 유실 가능성이 있음.basic_ack()
를 사용하고, 메세지 처리 실패시 basic.nack()
을 사용하여 메세지를 큐로 돌려보내거나, basic.reject()
를 사용하여 메세지를 직접 폐기하는 것이 안전하다.basic_consume
으로 콜백함수를 등록하면, queue에 메세지가 존재할 때 콜백 함수가 동작하는 방식basic_get
으로 메세지 하나를 직접 가져오는 방식그래서 전체 Flow는
이것들의 Flow를 그리면 다음과 같다.
routing_key=center
인 메세지를 보내면 QueueA
에 저장됨routing_key=error
인 메세지를 보내면 QueueC
에 저장됨Exchange
나 routing key
가 없을 경우, 메세지는 증발함.물론 이 전제조건은
이 Flow를 Python 에서 pika 라이브러리를 이용한 코드로 나타내면 아래와 같다.
import pika
# RabbitMQ 서버 연결
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 1. Exchange 선언
channel.exchange_declare(exchange='ExchangeA', exchange_type='direct')
channel.exchange_declare(exchange='ExchangeB', exchange_type='direct')
# 2. Queue 선언
channel.queue_declare(queue='QueueA')
channel.queue_declare(queue='QueueB')
channel.queue_declare(queue='QueueC')
# 3. 바인딩: 각 Queue를 Exchange에 라우팅 키와 함께 연결
channel.queue_bind(exchange='ExchangeA', queue='QueueA', routing_key='center')
channel.queue_bind(exchange='ExchangeB', queue='QueueB', routing_key='error')
# 메세지 보내는 producer 예시
channel.basic_publish(
exchange='ExchangeA',
routing_key='center',
body='Heung-Min-Son wins Europa'
)
channel.basic_publish(
exchange='ExchangeB',
routing_key='error',
body='System error'
)
connection.close()
이런 식으로 연결 되고 진행되는 것이다.
여러 이유가 있을텐데, 대표적인 이유를 몇 가지 들어보자면
마지막 병렬 처리에 대한 내용은 아직 제대로 이해 못하였으나, 나머지는 납득이 가능하다.
쉽게 말하면 Producer - Queue - Consumer 간 결합도를 떨어트려서, 유연함과 확장성을 확보한 것이다.
Producer <-> Consumer는 직접 연결되어 있지 않으니 서로 몰라도 되고 등등, 장점이 많은 구조인 것.
위 코드를 보다보면 connection
, channel
과 같은 설명되지 않은 변수가 있어서 좀 당황스럽다.
Connection
: Client(예를 들면 pika나 java client 등)와 RabbitMQ 서버 간의 TCP 연결.Channel
: connection
에 존재하는 논리적 메세징 세션. 메세지 송수신 처리 단위측 Connection
하나 안에 여러 Channel
이 존재하는 개념.
Queue
, Exchange
는 특정 채널, 커넥션에 종속된 개념이 아니라 broker 전체에 공유되는 Global Resource
이다.Connection
과 Channel
은 클라이언트 별로 존재하는 개념인데, Queue
와 Exchange
정보는 RabbitMQ Server에 저장되는 내용이기 때문에, 클라이언트와 상관없이 공유되는 개념인 것passive=True
옵션을 사용하기도 함.그래서 아래처럼
# Channel 1
channel1.exchange_declare(exchange='logs', exchange_type='direct')
channel1.queue_declare(queue='error_logs')
channel1.queue_bind(exchange='logs', queue='error_logs', routing_key='error')
# Channel 2
channel2.basic_publish(exchange='logs', routing_key='error', body='Something broke!')
channel2
에서 queue_declare
, queue_bind
를 안해도 publish할 수 있음.