Message Queue 개념 정리

Ryu·2022년 5월 3일
0

2020-archive

목록 보기
4/7

2020에 작성한 노트를 옮겨 적은 것입니다.

Protocols

AMQP - Advanced Message Queuing Protocol

  • Reliability and interoperability
  • 굉장히 feature-rich하다
    • reliable queuing, topic-based pub/sub messaging, flexible routing, security
    • queue에 대한 access control이나 depth 설정 가능

MQTT - Message Queue Telemetry Transport

  • Pub/sub without queue
  • 굉장히 가볍다. 단순함이 장점. Resource-constrained devices나 bandwidth 낮은 경우 사용 -> IoT
  • 다음 messaging만 보장
    • fire-and-forget
    • at least once
    • exactly once

STOMP - Simple/Streaming Text Oriented Messaging Protocol

  • Text-based : header와 body frame로 구성 - Interoperability가 장점.
  • Queue, topic 개념없다.

    STOMP does not, however, deal in queues and topics—it uses a SEND semantic with a “destination” string. The broker must map onto something that it understands internally such as a topic, queue, or exchange. Consumers then SUBSCRIBE to those destinations. Since those destinations are not mandated in the specification, different brokers may support different flavours of destination. So, it’s not always straightforward to port code between brokers.

RabbitMQ에서의 Messaging Patterns

1. Simple messaging

니가 아는 그거다. Skip

2. Work Queues

  • Competing consumers pattern: 여러 워커들에게 task distribution
  • Time-consuming한 'task'들이 많을 때 서로 다른 worker consumer들에게 전달해주기 위해 사용

Message Acknowlegement

  • 오래 걸리는 task message를 보낸 후 consumer가 죽으면?
  • {noAck: true} 모드에서는 브로커가 메세지 보낸 직후 marks it for deletion
    • 이 때 워커가 죽으면, 메세지도 같이 사라짐.
  • {noAck: false}로, 특정 워커에서 브로거에게 '이 메세지 받았고 프로세스 다 했어' 라고 알림
    • 브로커는 ack 받은 경우 해당 메세지 delete
    • ack가 안오면 (워커가 죽던지, 채널이 죽던지 등) 브로커는 re-queue. 다른 워커에게 전달되겠지.
  • ACK를 까먹으면?
    • Common mistake임. 조심할 것!
    • Message가 Unacked 상태이므로 브로커가 메모리 릴리즈할 수 없어 → 메모리 엄청 먹는다
    • 주기적으로 잘 확인해봐라
    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message Durability

  • 워커만 죽은 경우 ack로 message가 유실되지 않도록 관리 가능. 그런데 RabbitMQ가 죽으면?
  • RabbitMQ가 죽더라도 데이터 남도록 하려면 두 가지 설정 필요
    • Queue를 durable하게 설정
      • channel.assertQueue('task_queue', {durable: true});
    • Message를 persistent하게 설정
      • channel.sendToQueue(queue, Buffer.from(msg), {persistent: true});
  • RabbitMQ의 메세지 persistency guarantee는 그렇게 강력하지 않다. 절대 유실되면 안되는 메세지는 Publisher confirms 이용할 것.

Fair Dispatch

  • 보통 Work queue를 사용하는 경우 task 자체가 시간이 오래걸리는 경우가 많으므로, 한번에 하나의 메세지만 수용하는 워커를 여러개 만드는 식으로 설계함.
  • channel.prefetch(1): RabbitMQ가 한번에 하나를 넘는 메세지 전달하지 않도록 설정.

3. Publish / Subscribe

  • 많은 consumer들에게 메세지를 한번에 전송
  • Publisher는 누가 받는지 상관안하고, 그냥 publish만 한다.
  • Subscriber는 내가 구독하는 queue에 대해 메세지를 받는다.

Exchange

  • Pub/sub을 구현하기 위해서 publisher들이 Exchange한테 메세지를 보내도록.
  • Exchange는 메세지를 보고 어떤 큐로 보낼지, 모두에게 보낼지, discard할 지 결정한다. 마치 라우터같은 느낌.
  • 네가지 타입이 있음
    • direct: binding-key에 따라 특정 큐에게 전달
    • topic: pattern으로 라우팅
    • headers: key-value를 통해서 라우팅
    • fanout: 모든 큐에게 broadcast
  • 속성
    • durable: 브로커 재시작하면 남아있음 ↔ transient
    • auto-delete: 마지막 큐 연결이 해제되면 삭제

Queue bindings

  • Exchange가 메세지를 어느 큐로 보낼 지 정하는 것: channel.queue_bind(exchange='logs', queue=result.method.queue)
  • Temporary queue
    • 만약 프로그램이 떠서, 모든 메세지를 출력하는 logger가 필요하다면, Rabbit에 연결할 때마다 temporary queue 생성할 수 있음
    • result = channel.queue_declare(queue='', exclusive=True)
      • queue name 지정안해줬기 때문에 result.method.queue에 random queue name 들어간다.
      • exclusive=true는 consumer 커넥션 종료 시 큐 지우도록 하는 것.

Routing

  • channel.bindQueue(queue, exchange, routingKey): 라우팅 키로 특정 큐에 바인딩한다

4. Topics

  • Pattern에 따라 메세지 받기
  • 위처럼, 라우팅만으로는 '모든 로그', 'critical error'로만 나눠서 큐잉이 가능. 하지만 어떤 종류의 로그인지 구분하고 싶다면?
  • critical error from 'cron' & all logs from 'kern' 이렇게 받고 싶다.

  • Topic Exchange 설정
    • routing_key에 . 으로 구분된 topic 넣는다 (stokc.usd.nyse, quick.orange.rabbit)
    • * : exactly one word
    • # : zero or more words
    • 위에서 만약 'orange' 처럼 바인딩 되지 않은 메세지 오면 → message lost

5. Remote Procedure Call (RPC)

  • Request/reply pattern

  • Remote computer에서 어떤 함수를 돌리고 싶을때.

  • 잘 모르고 쓰면 낭패보기 쉽다

    • 이 function call이 로컬인지, 리모트인지? 제대로 구분 가능해야.
    • 단순히 느린 RPC인지, RPC서버가 죽은건지?
    • blocking이기 때문에 왠만하면 async pipeline으로 대체해서 써라

  • 동작 방식

    1. 클라이언트가 시작되면, 익명의 callback queue 생성
    2. 클라이언트가 RPC request: reply_to는 콜백 큐, correlation_id: 모든 리퀘스트마다 유니크한 아이디 생성
    3. rpc_queue에 보내짐
    4. 서버가 리퀘스트 받으면 동작 수행 후 결과를 reply_to에 적힌 큐로 보냄
    5. 클라이언트가 callback queue에 메세지 기다리고 있다가 메시지가 오면 correlation_id확인. 이 응답에 대한 request 찾아서 해당 응답을 app에게 보냄.
  • 장점

    • RPC 서버가 느릴 때, 여러개 띄우면 된다.
    • RPC는 하나의 메세지만 주고 받을 수 있도록 하기 때문에, 클라이언트는 only one network round trip for a single RPC request이면 됨.

0개의 댓글