1. RabbitMQ 설치 [다운로드]
2. RabbitMQ 도커로 실행
docker-compose-rabbitmq.yml
version: '3'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: rabbitmq
volumes:
- ./rabbitmq/etc/:/etc/rabbitmq/
- ./rabbitmq/data/:/var/lib/rabbitmq/
- ./rabbitmq/logs/:/var/log/rabbitmq/
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_ERLANG_COOKIE: "RabbitMQ-My-Cookies"
RABBITMQ_DEFAULT_USER: "mbti"
RABBITMQ_DEFAULT_PASS: "1234"
docker-compose 실행 명령어
docker-compose -f docker-compose-rabbitmq.yml up -d
docker shell 접속 명령어
docker exec -it rabbitmq /bin/bash
docker 명령 실행 - management Plugin 활성화
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
명령 정리
사용자 생성하기 rabbitmqctl add_user mbti 1234 가상 호스트 만들기 (옵션) rabbitmqctl add_vhost myvhost 사용자에게 권한 할당하기 rabbitmqctl set_permissions -p myvhost mbti ".*" ".*" ".*" Management Plugin 활성화 rabbitmq-plugins enable rabbitmq_management 관리자 계정 추가 rabbitmqctl set_user_tags mbti administrator 계정 삭제 rabbitmqctl delete_user mbti
3. 코드 구현
기본 테스트 코드 구현 [참조사이트]
application.properties
#### RabbitMQ
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=mbti
spring.rabbitmq.password=1234
rabbitmq.queue.name=chat-queue
rabbitmq.exchange.name=chat-exchange
rabbitmq.routing.key=key
PostMan으로 테스트를 위한 선행 작업
응답 처리를 위해 에이전트 설치
Postman-Agent-win64-0.4.25-Setup.exeSpring Security에서 csrf 옵션을 사용 중지
http.csrf().disable();
테스트 결과
RabbitMQConfig
package com.example.jhta_3team_finalproject.config.chat;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@RequiredArgsConstructor
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String rabbitmqHost;
@Value("${spring.rabbitmq.port}")
private String rabbitmqPort;
@Value("${spring.rabbitmq.username}")
private String rabbitmqUserName;
@Value("${spring.rabbitmq.password}")
private String rabbitmqPassword;
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Value("${rabbitmq.routing.key}")
private String routingKey;
// org.springframework.amqp.core.Queue
@Bean
public Queue queue() {
return new Queue(queueName);
}
/**
* 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(exchangeName);
}
/**
* 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
* Exchange 에 Queue 을 등록한다고 이해하자
**/
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
/**
* RabbitMQ 연동을 위한 ConnectionFactory 빈을 생성하여 반환
**/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUserName);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory;
}
/**
* RabbitTemplate
* ConnectionFactory 로 연결 후 실제 작업을 위한 Template
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* 직렬화(메세지를 JSON 으로 변환하는 Message Converter)
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
RabbitMQController
package com.example.jhta_3team_finalproject.controller;//package com.example.jhta_3team_finalproject.controller;
import com.example.jhta_3team_finalproject.domain.chat.ChatMessage;
import com.example.jhta_3team_finalproject.service.chat.RabbitMQService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
@Slf4j
@RequiredArgsConstructor
@Controller
public class RabbitMQController {
private final RabbitMQService rabbitMQService;
@RequestMapping("/chat/emergency")
public void sendMessage(ChatMessage chatMessage) throws Exception {
this.rabbitMQService.sendMessage(chatMessage);
}
}
RabbitMQService
package com.example.jhta_3team_finalproject.service.chat;//package com.example.jhta_3team_finalproject.service.chat;
import com.example.jhta_3team_finalproject.domain.User.User;
import com.example.jhta_3team_finalproject.domain.chat.ChatMessage;
import com.example.jhta_3team_finalproject.domain.chat.ChatRoom;
import com.example.jhta_3team_finalproject.handler.chat.SocketHandler;
import com.example.jhta_3team_finalproject.mybatis.mapper.chat.ChatMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Queue 로 메세지를 발핼한 때에는 RabbitTemplate 의 ConvertAndSend 메소드를 사용하고
* Queue 에서 메세지를 구독할때는 @RabbitListener 을 사용
*
**/
@Slf4j
@RequiredArgsConstructor
@Service
public class RabbitMQService {
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Value("${rabbitmq.routing.key}")
private String routingKey;
private final RabbitTemplate rabbitTemplate;
private final SocketHandler socketHandler;
private final ChatService chatService;
private final ChatSseService chatSseService;
private final ChatMapper dao;
/**
* 1. Queue 로 메세지를 발행
* 2. Producer 역할 -> Direct Exchange 전략
*/
public void sendMessage(ChatMessage chatMessage) throws Exception {
log.info("Send Message : {}", chatMessage.toString());
/**
* 2024-06-24, 모든 부서 채팅방의 방 번호를 구함
*/
List<ChatRoom> deptChatRoomList = dao.getDeptChatRoomNum();
deptChatRoomList.forEach(chatRoom -> {
chatMessage.setChatRoomNum(chatRoom.getChatRoomNum());
this.rabbitTemplate.convertAndSend(exchangeName, routingKey, chatMessage);
});
}
/**
* 1. Queue 에서 메세지를 구독
*/
@Transactional
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void receiveMessage(ChatMessage chatMessage) throws Exception {
log.info("Received Message : {}", chatMessage.toString());
chatMessage = chatService.createMessage(chatMessage);
if(chatMessage != null) {
ChatRoom chatRoom = new ChatRoom();
chatRoom.setChatRoomNum(Long.valueOf(chatMessage.getChatRoomNum()));
chatRoom.setChatSessionId(chatMessage.getSenderId());
List<User> users = chatService.chatRoomParticipateList(chatRoom);
/**
* 2024-06-24, SSE 비동기 처리로 채팅방 목록 업데이트
*/
users.forEach(user ->
CompletableFuture.runAsync(() ->
chatSseService.chatRoomListRefresh(user, "chatRoomListRefresh"))
.exceptionally(throwable -> {
// 개발자 담당자한테 web hook 및 전달할 있게 처리하기.
log.error("Exception occurred: " + throwable.getMessage());
return null;
})
);
}
socketHandler.sendEmergencyMessage(chatMessage);
}
}
4. 시연 영상 [영상]