메시지 브로커로, 메시지를 큐(queue)에 저장하고, 필요할 때 적절한 수신자에게 전달한다.

docker run -d --name rabbitmq -p5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management
다음 docker 명령어를 통해 rabbitmq를 도커 환경에서 쉽게 띄울 수 있다.

다음 localhost:15672 로 접속하게 되면, 다음과 같은 웹 화면을 확인할 수 있는데 기본 id/password 는 guest/guest 이다.

해당 그림에서 처럼 Order 서비스에서 익스체인지를 통해 각 큐로 메시지를 바인딩해주고, product,payment 서비스로 각 큐가 메시지를 전달하는 구조로 이루어져 있다.
주요한 점은 바인딩의 이름도 메시지 큐의 이름과 동일하게 설정해야 헷갈리지 않는다.
spring.application.name=order
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
이처럼 order 입장에서는 메시지를 publish 하기 때문에 exchange의 이름과 각 큐의 이름을 알고 있어야 한다.
@Configuration
public class OrderApplicationQueueConfig {
@Value("${message.exchange}")
private String exchange;
@Value("${message.queue.product}")
private String queueProduct;
@Value("${message.queue.payment}")
private String queuePayment;
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange); // 해당 이름의 exchange 가 생성됨.
}
@Bean
public Queue queueProduct() {
return new Queue(queueProduct); // 해당 이름의 Queue 가 생성 됨.
}
@Bean
public Queue queuePayment() {
return new Queue(queuePayment);
}
@Bean
public Binding bindingProduct() {
return BindingBuilder
.bind(queueProduct()) // 바인딩할 큐
.to(exchange()) // exchange
.with(queueProduct); // 라우팅 이름 (큐의 이름과 일치)
}
@Bean
public Binding bindingPayment() {
return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment);
}
}
Exchane, Queue, Binding 을 생성하는 작업을 해줘야 한다.
public void createOrder(String orderId) {
rabbitTemplate.convertAndSend(productQueue, orderId);
rabbitTemplate.convertAndSend(paymentQueue, orderId);
}
이런식으로 rabbitTemplate을 통해서 메시지를 전송할 수 있다.

그러면 웹 페이지를 통해 확인할 수 있다.

바인딩에 대한 정보나,

받은 메시지의 대한 정보도 확인할 수 있다.
또한, Delete, Purge 등 다양한 강력한 동작을 지원한다.
@Slf4j
@Component
public class ProductEndpoint {
@Value("${spring.application.name}")
private String appName;
@RabbitListener(queues = "${message.queue.product}")
public void receiveMessage(String orderId) {
log.info("receive orderId:{}, appName : {}", orderId, appName);
}
}
@RabbitListener 를 통해 해당 큐 이름과 동일한 큐에 적재된 메시지들을 전달받을 수 있다.

테스트를 위해 ProductApplication을 2개로 소비를 테스트해보면


이런식으로 자동으로 RR 방식으로 메시지의 전달을 분산시켜주는 것을 확인할 수 있다.
그렇기에 만약 각 서버가 로드밸런싱을 통해 여러개의 서버로 이루어지더라도 기존 RabbitMQ는 자동으로 로드밸런싱된 각 서버로 RR 방식으로 메시지를 전송한다는 것을 확인할 수 있다.

또한 Overview 탭에서 MQ가 메시지나 소켓을 처리할 수 있을지 상태를 확인할 수 있다.
분산 스트리밍 플랫폼으로, 주로 실시간 데잍 피드의 빅 데이터 처리르 목적으로 사용한다.
메시지 큐와유사하지만, 대용량 데이터 스트림을 저장하고 실시간으로 분석하거나 처리하는 데 중점을 둠.
역할
쉽게 생각해봤을 때 RabbitMQ의 하나의 큐에 모든 요청을 받아서 순서를 보장하거나, 싱글 스레드 기반의 Redis의 list를 Queue처럼 사용해서 순서를 보장해줄 수 있을 것 같다.
근데, 큐에서 순서를 보장했다 하더라도 서버가 여러 대 떠있는 상황이라면 각 서버에서 처리하는 속도가 다르기 때문에 최종적으로는 순서 보장이 안될 수도 있다.
그에 따라 낙관적 lock, 비관적 lock, 분산 lock 같은 lock개념을 적용할 수 있을 것 같다.
https://tecoble.techcourse.co.kr/post/2023-08-16-concurrency-managing/
다음 블로그에서 자세하게 설명하고 있다.
좌석 선택을 예시로 들었을 때 좌석을 선택하고 넘어가는 과정에서 예약 설정을 하고, 결제가 완료된 후 확정상태로 변경되는 방법이 있을 수 있고,
좌석을 중복해서 선택할 수 있으며, 결제가 완료되는 시점에 좌석 확정 상태가 되는 방법이 있다.
다음 게시글에 Kafka 실습에 대한 내용을 다루도록 하겠다.