[RabbitMQ] RabbitMQ + Mongodb Change stream

songmoana·2024년 2월 16일

프로젝트 설명 : 1초에 한 번씩 API 요청을 해야하는 프로젝트에서 과부하 이슈로 인해 이벤트 드리븐 (event-driven) 패턴을 활용하여 event가 있을 때만 데이터 요청. RabbitMQ publisher 서버 구현


Rabbit MQ setting

1. docker-compose 파일 작성

version: '3'
services:
  rabbitmq:
    container_name: rabbitmq_songmoana
    image: rabbitmq:management-alpine
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
    ports:
      - "5672:5672" # rabbit mq port
      - "15672:15672" # UI port

2. web UI 접속 후 확인

  • docker-compose 파일에서 설정했던 user, pass 로 로그인

MongoDB setting

* 로컬 mongoDB 환경에서 실행 (Docker X)
$sudo vi /etc/mongod.conf
replication:                 # 주석해제
  oplogSizeMB: 2000          # primary node log 저장 사이즈 
  replSetName: "rs0"         # repl node 이름 설정
* mongoDB 설정 후 change_stream TEST (Repl 설정안하면 사용 불가!)
# change_stream 받아오는 부분

def get_change_stream(self):
    pipeline = [{"$project": {"_id": 1}}] # 필요한 필드만 받아오게 pipeline 설정
    self.change_stream=self.coll.watch(pipeline) # 설정한 collection 의 변경이 있을 경우 값 반환
    return self.change_stream

Connect Rabbit MQ & Publisher & Consumer

# rabbitmq connection

    def connect_rabbitmq(self):
        try:         
            credentials = pika.PlainCredentials(username=st.BROKERNAME, password=st.BROKERPASS)
            rabbit_connection = pika.BlockingConnection(pika.ConnectionParameters(host=st.BROKERHOST, heartbeat=st.HEARTBEAT, credentials=credentials)) 
			# heartbeat - pulisher와 rabbitMQ가 5분 간격으로 heartbeat를 주고 받는데 이 때 5분내에 heartbeat가 없을경우 연결이 끊어짐    

            self.channel = rabbit_connection.channel()
                        
            self.channel.exchange_declare(exchange="mongodb_change_stream", exchange_type="fanout") # fanout - 특정 queue가 아닌 모든 queue에 메시지 전송 
        except Exception as e:
            logger.error(e)

# rabbitMQ에 변경된 데이터를 받아와서 publish하는 부분

	def pulish_to_rabbitmq(self, chage):
        self.channel.basic_publish(
            exchange="mongodb_change_stream", # 교환기 이름
            routing_key="", # 라우팅 키를 지정
            body=json.dumps(chage) # 발행할 메시지 내용이 어떤 형식으로 전달될 건지 
        )

# consumer가 받아오는 부분

import pika

queue = "test_queue"

def receiver_main():
    credentials = pika.PlainCredentials(username="admin", password="admin")
    rabbit_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", heartbeat=300, credentials=credentials)) 
    
    channel = rabbit_connection.channel()
    channel.queue_declare(queue=queue, durable=True)
    channel.queue_bind("test", "mongodb_change_stream") # queue와 exchange를 binding !!
    
    def callback(ch, method, properties, body):
        print("Message is Arrived %r" % body)

    channel.basic_consume(
                        queue=queue, 
                        on_message_callback=callback,
                        auto_ack=True)
    try:
        print("Waiting for messages.")
        channel.start_consuming()
    except KeyboardInterrupt:
        print('Ctrl+C is Pressed.')        
        
if __name__ == '__main__':
    receiver_main()
1. Publisher에서 queue를 생성하지 않고 fanout exchange 하여 전송한다면, consumer 측에서 queue를 명시적으로 생성해줘야 함.
2. consumer측에서 queue와 exchange를 binding 하여 queue가 어떤 exchange로부터 메시지를 받을지 정의한다.
3. Broker가 전달해준 메시지를 Consumer가 받아오고 callback 함수로 print 하여 결과 확인

>> Result <<

* 1. Publisher, consumer 둘 다 연결은 되어있으나 publisher에서 전달한 메시지가 consumer에게 전달되지 않았을 때의 rabbitMQ 상태

* 이 때 consumer 코드가 실행되어 정상적으로 메시지를 전달받았을 경우 Ready 2였던 메시지가 0으로 변경된다. (아래 이미지 참고)
-> 시스템 내의 queue에 저장되어 있다가 전달됨. 


* 2. consumer가 정상적으로 구독한 결과

* 정상적으로  publish(노란색) & deliever(초록색) 된 것을 확인할 수 있음.
* 아래의 이미지는 consumer가 받아온 데이터 print.

profile
옹모아나 - 개발백과

0개의 댓글