기본적인 메세지큐 방식은 이렇습니다.
Producer
가 메세지를 생산하고 Message Broker
에게 메세지를 보내고,
Consumer
는 Broker를 Listening을 하면서 해당 메세지를 소비합니다.
RabbitMQ는 AMQP 프로토콜을 구현한 오픈소스 메시지 브로커 소프트웨어입니다.
RabbitMQ에서는 Exchange
라는 개념이 등장하는데,
Producer가 어디에다가 정했는지를 나타냅니다. Exchange 종류에 따라서 Direct
, Topic
, Fanout
이런 식으로 종류가 나누어지게 됩니다.
메시지큐에 대한 자세한 내용은 다음 블로그를 참조하시면 됩니다.
AMQP란 Advanced Message Queueing Protocol
의 줄임말로 MQ의 오픈소스에 기반한 표준 프로토콜을 의미합니다. AMQP는 마지막 P(rotocol)에서 보는 것과 같이 프로토콜을 의미하기 때문에 이것을 사용한 가장 유명한 소프트웨어는 RabbitMQ라 볼 수 있습니다.
AMQP를 구성하는 요소는 Exchange, Queue, Binding 이 있습니다.
자세한 사항은 이 블로그 참고
https://brownbears.tistory.com/508
window 설치는 생각보다 귀찮아서, docker 컨테이너로 돌려서 실행해보았다.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
--restart=unless-stopped \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
rabbitmq:management
localhost:15672
RabbitMQ는 메세지큐로 Advanced Message Queuing Protocol(AMQP)
의 구현체입니다.
디팬더시 이름도 amqp
로 되어있습니다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: root
rabbitMQ 디렉토리를 만들어 Consumer, Producer 를 만들어 냅니다.
Producer
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendTo(String message) {
this.rabbitTemplate.convertAndSend("CREATE_POST_QUEUE", message);
}
}
Consumer
@Component
@RequiredArgsConstructor
public class Consumer {
private final ObjectMapper objectMapper;
private final PostRepositroy postRepositroy;
@RabbitListener(queues = "CREATE_POST_QUEUE")
public void handler(String message) throws JsonProcessingException {
Post post = objectMapper.readValue(message, Post.class);
// 메시지 내용 큐에 들어감.
postRepositroy.save(post);
}
}
Controller
@RequiredArgsConstructor
@RestController
public class PostController {
private final Producer producer;
private final ObjectMapper objectMapper;
@PostMapping
public Post createPost(@RequestBody Post post) throws JsonProcessingException {
String jsonPost = objectMapper.writeValueAsString(post);
producer.sendTo(jsonPost);
return post;
}
...
}
mq를 Overview > 어드민에서도 확인
💥 객체 직렬화 전송 위해 컨버터를 만들지 않으면 오류가 남!
java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.leesh.sender.dto.DeptDto
at org.springframework.amqp.support.converter.SimpleMessageConverter.createMessage(SimpleMessageConverter.java:164) ~[spring-amqp-2.4.3.jar:2.4.3]
에러발생. SimpleMessageConverter는 String과 byte[] 만 지원한다는것.
다음과 같은 Jackson2JsonMessageConvertor
를 빈으로 주입해야 한다.
Converter
@Configuration
public class Converter {
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}