- 오픈소스의 메시지 브로커
- 홈페이지: https://www.rabbitmq.com/
- Route key가 정확히 일치하는 queue에 메시지 전송
- Unicast
- 예제: https://www.rabbitmq.com/tutorials/tutorial-four-python.html
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
# error 큐로 온 메시지는 로그 파일에 쓰고, info, warning, error 메시지는 화면에 출력
$ python receive_logs_direct.py error > logs_from_rabbit.log
$ python receive_logs_direct.py info warning error
# 프로듀서가 메시지를 발송
$ python emit_log_direct.py error "Run. Run. Or it will explode."
$ python emit_log_direct.py info "Run. Run. Or it will explode."
- Routing key 패턴이 일치하는 Queue에 메시지 전송
- Multicast
- 예제: https://www.rabbitmq.com/tutorials/tutorial-five-python.html
라우팅 키가 example.orange.rabbit 인 경우 메시지가 Q1, Q2에 모두 전달
라우팅 키가 example.orange.turtle 인 경우 메시지가 Q1에만 전달
라우팅 키가 lazy.grape.rabbit인 경우엔 메시지가 Q2에 한 번만 전달 (라우팅 패턴이 여러 개 일치하더라도 하나의 큐에는 메시지가 한 번만 전달)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
$ python receive_logs_topic.py "*.orange.*"
[*] Waiting for logs. To exit press CTRL+C
[x] 'example.orange.rabbit':b'msg1'
[x] 'example.orange.turtle':b'msg2'
$ python receive_logs_topic.py "*.*.rabbit" "laze.#"
[*] Waiting for logs. To exit press CTRL+C
[x] 'example.orange.rabbit':b'msg1'
[x] 'lazy.grape.rabbit':b'msg3'
# 메시지 발생
$ python emit_log_topic.py "example.orange.rabbit" "msg1"
$ python emit_log_topic.py "example.orange.turtle" "msg2"
$ python emit_log_topic.py "lazy.grape.rabbit" "msg3"
- 해당 Exchange에 등록된 모든 Queue에 메시지 전송
- Broadcast
- 예제: https://www.rabbitmq.com/tutorials/tutorial-three-python.html
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
# 모든 메시지를 로그파일에도 쓰고 화면에도 출력함
$ python receive_logs.py > logs_from_rabbit.log
$ python receive_logs.py
# 메시지 발송
$ python emit_log.py