메세지 큐는 응용 소프트웨어 간의 비동기 통신을 위한 메시지 지향 미들웨어를 구현한 시스템으로 producer로부터 들어오는 메세지(요청)를 큐에 담고 consumer는 큐에 담긴 메세지를 꺼내 처리하는 방식의 구조로 구현되어 있다.
기존 동기적인 방식의 메세지 교환을 미들웨어의 비용을 추가하여 비동기식으로 처리하게 될 때 얻게 되는 장점은 다음과 같다.
모든 요청은 메세지큐에 저장되고 처리가 가능한 요청만을 메시지큐에서 꺼내와 처리하고 있어 서버의 부담을 줄일 수 있다.
일부가 실패해도 전체에 영향을 주지 않는 탄력성을 제공하고 실패할 경우 재실행이 가능하다.
메세지를 전달하는 과정에서 보관을 하고 있어 통신을 하는 클라이언트 간 네트워크 연결이 유지되어야 하는 TCP와 다르게 연결을 유지할 필요가 없어지는 만큼 어플리케이션 간 결합도를 느슨하게 할 수 있다.
메세지 라우팅을 통해 하나의 메세지를 여러 수신자에게 배포가 가능하다.
애플리케이션과 분리되어 있기 때문에 확장성이 용이하다.
이러한 이유로 서버와 클라이언트간의 메세지 교환이 많이 이뤄지는 대용량 데이터 처리를 위한 배치 작업과 채팅 서비스에서 사용된다.
AMQP는 메세지큐를 구현한 프로토콜로 3개의 컴포넌트 개념을 통해 라우팅을 한다.
exchange type의 라우팅 전략은 publisher에서 송신한 메세지 헤더에 포함되는 가상 주소인 Routing Key를 기준으로 어떤 큐로 라우팅 할 지 결정하는 전략이다.
version: '3'
services:
rabbitmq:
image: rabbitmq:3-management-alpine #콘솔 사용이 가능한 이미지
container_name: rabbitmq-stream
volumes:
#자동으로 웹콘솔 플러그인이 설치되도록 엔트리 포인트 스크립트를 수정했다.
- ./docker-entrypoint.sh:/docker-entrypoint.sh
- ./rabbitmq/etc/:/etc/rabbitmq/
- ./rabbitmq/data/:/var/lib/rabbitmq/
- ./rabbitmq/logs/:/var/log/rabbitmq/
ports:
- "5672:5672" #AMQP 메세지 전송 포트
- "15672:15672" #콘솔 웹 접속 포트
environment:
RABBITMQ_ERLANG_COOKIE: "RabbitMQ-My-Cookies"
RABBITMQ_DEFAULT_USER: "root"
RABBITMQ_DEFAULT_PASS: "1234"
@Configuration
public class MessageBrokerConfiguration {
@Bean
public ConnectionFactory getConnectionFactory(){
ConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_HOST, RABBITMQ_PORT);
((CachingConnectionFactory) connectionFactory).setUsername(USERNAME);
((CachingConnectionFactory) connectionFactory).setPassword(PASSWORD);
return connectionFactory;
}
@Bean
public TopicExchange getExchange(){
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Queue queue(){
return new Queue(QUEUE_NAME, true);
}
@Bean
public Binding getBinding(Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
메세지큐에서 저장된 메세지를 가져오는 방식은 rabbitTemplate을 통해 가져오는 방식과 아노테이션을 사용하는 방식 2가지가 존재하는데 아노테이션의 경우 전달된 메세지를 사용할 메소드 위에 @RabbitListener을 달면 된다.
특정 큐에 담긴 메세지를 수신하고 싶은 경우 아노테이션의 queue 파라미터에 연결된 메세지 브로커의 큐 이름을 지정하거나 listenerContainerFactory에 원하는 큐와 관련된 connectionFactory과 수신되는 메세지를 변환하기 위해 필요한 MessageConverter를 등록해주는 설정을 한 후 아노테이션의 containerFactory에 금방 설정한 빈의 이름을 입력하는 것을 통해 원하는 메세지큐에서만 수신이 가능하도록 커스터마이징이 가능하다.
@Configuration
public class MessageBrokerConfiguration {
...기존 rabbitmq 설정...
@Bean("SampleContainerFactory")
SimpleRabbitListenerContainerFactory getSampleContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
return factory;
}
@Bean
public Jackson2JsonMessageConverter getMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@Component
public class Receiver{
private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(containerFactory = "SampleContainerFactory", queues="test-queue-1")
public void onReceiveMessage(SampleMessage message){
logger.info("Receiver received message: {}", message);
}
}
객체 생성을 대신해주는 디자인 패턴으로 클라이언트는 생성하는 객체에 대한 종속 코드가 없어 서로 간의 종속성을 낮추고 결합도를 느슨하게 할 수 있다.
위의 configuration 코드에서도 RabbitMQ와의 커넥션을 위한 객체 역시 개발자가 직접 생성하는 것이 아닌 생성에 필요한 설정 객체만 넘겨주고 팩토리 빈에 생성 작업을 위임하고 있는 것을 확인할 수 있다.