MQ

가언·2024년 8월 7일

MQ(Message Queue)

MSA뿐 아니라 여러 시스템의 통신 방식

: 분리된 어플리케이션 (모듈)끼리 데이터를 주고 받는 (통신) 방식
ex. A service <-> B service

그럼 블로킹/논블로킹은 동기와 비동기와 뭐가 다른거지?

동기/비동기 + 블로킹/논블로킹 조합

먼저 동기와 비동기에 대해 알아보자!


동기적 통신방식

  • 요청을 보냈을 때 응답이 올때까지 기다리는 것, 요청과 응답의 순서가 보장된다.
  • 종류
    • REST API: http프로토콜 사용

비동기적 통신방식

  • 요청과 응답의 sync가 맞지 않아도 되기때문에 동시에 다른 일을 가능하다.
    • 완료 여부 신경쓰지 않는 것
    • 요청/전달을 보낸 후, 응답과 관계없이 다음 동작 실행
      = 작업이 병렬로 배치되어 실행된다.
      => 나중에 시작된 작업이 먼저 끝나는 경우도 가능!
  • 종류
    -
    • 메시지 큐 (Message Queue): 메시지 큐를 통해 모듈 간 비동기 메시지를 주고 받는 방식 ex) RabbitMQ, Kafka, ActiveMQ
    • Publish/Subscribe 모델: 한 모듈이 이벤트를 발행하고 다른 모듈이 이를 구독하여 비동기적으로 통신하는 방식 ex) Apache Kafka
    • 메시지 브로커(중개인) 방식(기술)

즉 동기와 비동기는 응답을 받을 때까지, 기다리냐 안기다리냐의 차이!
=> 이렇게 생각하면, 블로킹이랑 마치 비슷해보이는데 완료여부를 신경쓸지 말지의 차이이다.

동기비동기
A가 B의 리턴(값) 계속 확인! 신경!B의 작업 완료 여부는 노상관
콜백함수를 같이 주면서 끝나면 알아서 이거 실행해~

다음은 블로킹과 논블로킹~!

  • 블로킹과 논블로킹은 제어권을 넘겨주는지 아닌지에 대한 차이
  • 제어권을 넘겨주게 되면 다른일을 수행하지 못함.
블로킹논블로킹
A  -> B 제어권 넘김A가 제어권을 가진 상태로 B 호출!
A 멈춤 / B 실행A 계속 실행 / B 실행
A  <- B 제어권을 다시 넘김
    (ex. B 함수 완료 시)

이제 블로킹/논블로킹 , 동기/비동기의 조합의 차이를 그림으로 보자!


메시지 브로커?

  • 메시지 브로커가 왜 생긴걸까?
    또, 비동기여야만 하는 이유는?

  • 양쪽 모두에게 자유를 주어 받아가고 싶을 때 받아가고 주고 싶을때 줄 수 있도록!! 주는 친구뿐 아니라, 받아가는 친구의 자유도 높여줌. like kakaotalk
    즉, 서로의 자유도를 높여주기 위해 비동기 메시지 브로커를 사용한다!

예시) 주문 서비스

  • 주문 서비스에서 메시지 브로커에게 주문 정보를 던져주고, 주문 서비스는 비동기적으로 자기 할 일 계속 함.
  • 재고 서비스가 메시지 브로커에게 주문 정보를 비동기적으로 받아가서 재고 수량을 하나 뺀다.

종류

  • 카프카(Kafka)
    - 데이터 취합 => 분산 "스트리밍"
    • Redis(공유 데이터베이스)
      ex. 메신저, 유튜브/넷플릭스
  • Message Queue(RabbitMQ, ActiveMQ ...)

🧶 실습

  1. "Hello World!"

    • 보내는 쪽

      package com.example.a_module;
      import com.rabbitmq.client.ConnectionFactory;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.MessageProperties;
      public class Sender {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                    String message = String.join(" ", argv);
                    channel.basicPublish("", "hello", null, message.getBytes());
                    System.out.println(" [x] Sent '" + message + "'");
            }
        }
      }
    • 받는 쪽

      package com.example.b_module;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      import com.rabbitmq.client.DeliverCallback;
      import java.nio.charset.StandardCharsets;
      public class Recv {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
      
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
      }
      
  2. Work Queues

  • 라운드 로빈 방식으로 데이터를 받는다.
  • NewTask
//package com.example.a_module;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = String.join(" ", argv);

            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
  • Worker
//package com.example.b_module;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;

public class Worker {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

결과 화면

  • 2개의 워커가 번갈아가면서 메세지를 받을 수 있다.

profile
@gari_guri

0개의 댓글