핵심 코드만 추가하였고, 전체 코드는 깃허브를 통해 확인이 가능합니다.
앞서 설계한 서비스(RabbitMQ, MongoDB, Redis, MySQL)를 실행하기 위해 docker-compose를 작성
RabbitMQ는 STOMP 플러그인 활성화를 위한 명령어를 추가
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
- "61613:61613"
command: >
/bin/bash -c "rabbitmq-plugins enable --offline rabbitmq_management rabbitmq_stomp && rabbitmq-server"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
mongodb:
image: mongo:latest
container_name: mongodb
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: 1234
MONGO_INITDB_DATABASE: rabbitmq
ports:
- "27017:27017"
networks:
- my_network
redis:
image: redis:alpine
container_name: redis
hostname: redis
ports:
- "6379:6379"
networks:
- my_network
mysql:
image: mysql:8.0
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: 1234
MYSQL_DATABASE: rabbitmq
ports:
- "3306:3306"
networks:
- my_network
networks:
my_network:
Spring과 각 서비스를 연동
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/rabbitmq?serverTimezone=Asia/Seoul&characterEncoding=UTF-8
username: root
password: 1234
jpa:
hibernate:
ddl-auto: create
data:
mongodb:
uri: mongodb://root:1234@localhost:27017/rabbitmq?authSource=admin&authMechanism=SCRAM-SHA-1
username: root
password: 1234
redis:
host: localhost
port: 6379
jwt:
secret: lsh2613 #jwt secret key
access:
expiration: 1800000
header: Authorization
refresh:
expiration: 259200000
header: Authorization-Refresh
rabbitmq:
host: "localhost"
port: 5672
virtual-host: "/"
username: "guest"
password: "guest"
relay:
port: 61613
system-login: guest
system-passcode: guest
client-login: guest
client-passcode: guest
chat:
queue:
name: "chat.queue"
exchange:
name: "chat.exchange"
routing:
key: "*.room."
logging:
level:
org.springframework.data.redis.connection: DEBUG
org.springframework.cache: DEBUG
@Configuration
@EnableRabbit
@RequiredArgsConstructor
public class RabbitConfig {
@Value("${rabbitmq.chat.queue.name}")
private String chatQueueName;
@Value("${rabbitmq.chat.exchange.name}")
private String chatExchangeName;
@Value("${rabbitmq.chat.routing.key}")
private String routingKey;
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private int port;
@Value("${rabbitmq.virtual-host}")
private String virtualHost;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
// Queue 등록
@Bean
public Queue queue() {
return new Queue(chatQueueName, true);
}
// Exchange 등록
@Bean
public TopicExchange exchange() {
return new TopicExchange(chatExchangeName);
}
// Exchange와 Queue바인딩
@Bean
public Binding binding(Queue queue, TopicExchange exchange){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey);
}
// RabbitMQ와의 메시지 통신을 담당하는 클래스
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setExchange(chatExchangeName);
return rabbitTemplate;
}
// RabbitMQ와의 연결을 관리하는 클래스
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setVirtualHost(virtualHost);
factory.setUsername(username);
factory.setPassword(password);
return factory;
}
}
- new TopicExchange(chatExchangeName) -> 설정한 이름으로 TopicExchange를 생성
- return BindingBuilder.bind(queue).to(exchange).with(routingKey) -> queue와 exchange를 routingKey(여기선 .room.)를 통해 바인딩
- rabbitTemplate.setExchange(chatExchangeName) -> 이 세팅을 통해 RabbitTemplate을 통해 메시지 전송 시 따로 전송할 exchangeName을 설정할 필요 없음
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.relay.port}")
private int port;
@Value("${rabbitmq.relay.system-login}")
private String systemLogin;
@Value("${rabbitmq.relay.client-passcode}")
private String systemPasscode;
@Value("${rabbitmq.relay.client-login}")
private String clientLogin;
@Value("${rabbitmq.relay.client-passcode}")
private String clientPasscode;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// socketJs 클라이언트가 WebSocket 핸드셰이크를 하기 위해 연결할 endpoint를 지정할 수 있다.
registry.addEndpoint("/chat/inbox")
.setAllowedOriginPatterns("*"); // cors 허용을 위해 꼭 설정해주어야 함. setCredential() 설정시에 AllowedOrigin 과 같이 사용될 경우 오류가 날 수 있으므로 OriginPatterns 설정으로 사용하였음
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 메시지 브로커 설정
registry.setPathMatcher(new AntPathMatcher(".")); // url을 chat/room/3 -> chat.room.3으로 참조하기 위한 설정
registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
.setRelayHost(host)
.setRelayPort(port)
.setSystemLogin(systemLogin)
.setSystemPasscode(systemPasscode)
.setClientLogin(clientLogin)
.setClientPasscode(clientPasscode);
// 클라이언트로부터 메시지를 받을 api의 prefix를 설정함
// publish
registry.setApplicationDestinationPrefixes("/pub");
}
}
- registry.addEndpoint("/chat/inbox") -> ws://{서버ip}:{port}/chat/inbox를 통해 소켓 연결
- registry.enableStompBrokerRelay(...) -> stomp 외부 메시지 브로커 사용 허가
- registry.setApplicationDestinationPrefixes("/pub") -> 메시지 보낼 저장될 큐의 prefix
채팅방 입장 시 처리해야 되는 로직은 다음과 같다.
1. Redis에 채팅방 입장 유저 추가
2. 나의 마지막 접속 시간 이후에 생성된 메시지 존재 && 현재 채팅방에 입장해있는 유저가 존재한다면, 먼저 입장해있는 유저에게 '싱크 요청' 메시지 전송
먼저 이 프로젝트에서는 STOMP 헤더에 JWT, chat-room-id를 추가하여 유저를 식별하고, 접속하려는 채팅방 정보를 전달받았다.
public class JwtAuthenticationInterceptor implements ChannelInterceptor {
private final TokenUtil tokenUtil;
private final StompHeaderAccessorUtil stompHeaderAccessorUtil;
private final ChatRoomService chatRoomService;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
if (command==StompCommand.CONNECT)
handleConnect(accessor);
if (command==StompCommand.DISCONNECT)
handleDisconnect(accessor);
return message;
}
private void handleConnect(StompHeaderAccessor accessor) {
String accessToken = tokenUtil.extractToken(accessor, TokenType.ACCESS_TOKEN);
Long memberId = tokenUtil.validateTokenAndGetMemberId(accessToken);
stompHeaderAccessorUtil.setMemberIdInSession(accessor, memberId);
Long chatRoomId = stompHeaderAccessorUtil.getChatRoomIdInHeader(accessor);
stompHeaderAccessorUtil.setChatRoomIdInSession(accessor, chatRoomId);
chatRoomService.enterChatRoom(memberId, chatRoomId);
}
private void handleDisconnect(StompHeaderAccessor accessor) {
Long memberId = stompHeaderAccessorUtil.removeMemberIdInSession(accessor);
Long chatRoomId = stompHeaderAccessorUtil.removeChatRoomIdInSession(accessor);
chatRoomService.exitChatRoom(memberId, chatRoomId);
}
}
public class ChatRoomService {
public void enterChatRoom(Long memberId, Long chatRoomId) {
Member member = entityFacade.getMember(memberId);
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
redisChatUtil.enterChatRoom(chatRoomId, memberId);
readUnreadMessages(chatRoom, member.getId());
}
private void readUnreadMessages(ChatRoom chatRoom, Long memberId) {
ChatRoomMember chatRoomMember = chatRoomMemberRepository.findByChatRoomIdAndMemberId(chatRoom.getId(), memberId)
.orElseThrow(() -> new RuntimeException("채팅방 참가자를 찾을 수 없습니다"));
LocalDateTime lastEntryTime = chatRoomMember.getLastEntryTime();
boolean existsUnreadMessage = chatMessageRepository.existsByChatRoomIdAndCreatedAtAfter(chatRoom.getId(), lastEntryTime);
boolean existsOnlineChatRoomMember = redisChatUtil.getOnlineChatRoomMemberCnt(chatRoom.getId()) > 1; // 1은 본인
if (existsUnreadMessage && existsOnlineChatRoomMember)
sendChatSyncRequestMessage(chatRoom.getId());
}
@Transactional
public void exitChatRoom(Long memberId, Long chatRoomId) {
Member member = entityFacade.getMember(memberId);
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
ChatRoomMember chatRoomMember = chatRoomMemberRepository.findByChatRoomIdAndMemberId(chatRoom.getId(), memberId)
.orElseThrow(() -> new RuntimeException("채팅방 참가자를 찾을 수 없습니다"));
chatRoomMember.updateLastEntryTime();
redisChatUtil.exitChatRoom(chatRoom.getId(), member.getId());
}
JwtAuthenticationInterceptor에서 유저를 인증하고 세션에 userId, chatRoomId를 저장해뒀기 때문에 message만 전달받아 그대로 전송해주기만 하면 된다.
중요한 점은, 메시지를 보낼 때도 '안 읽은 수'를 계산해줘야 한다.
이 계산은 메시지 조회 API와 다르게 '채팅방 인원' - '현재 해당 채팅방에 참가 중인 유저 수'로 계산된다. 따라서 조회 시점과, 전송 시점으로 구분되어 메소드명을 만들어놨다.
public class ChatMessageController {
private final ChatMessageService chatMessageService;
/**
* Destination Queue: /pub/chat.message를 통해 호출 후 처리 되는 로직
*/
@MessageMapping("chat.message")
public void sendMessage(StompHeaderAccessor accessor, ChatMessageReq message) {
chatMessageService.sendMessage(accessor, message);
}
public class ChatMessageService {
@Transactional
public void sendMessage(StompHeaderAccessor accessor, ChatMessageReq req) {
Long memberId = stompHeaderAccessorUtil.getMemberIdInSession(accessor);
Member member = entityFacade.getMember(memberId);
Long chatRoomId = stompHeaderAccessorUtil.getChatRoomIdInSession(accessor);
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
ChatMessage chatMessage = saveChatMessage(req, chatRoom, member);
int unreadCnt = calculateUnreadCntAtPublish(chatRoom.getId());
sendMessage(member, chatMessage, unreadCnt, chatRoom);
}
}
채팅 조회는 성능상 모두 조회보단 무한스크롤로 적용해야 되지만, 편의상 생략했다.
'안 읽은 수'는 앞서 설명했던 것처럼 '채팅방 인원' - '현재 입장 중인 유저 수' - '현재 입장 중이지 않은 유저 중 마지막 입장 시간>메시지 생성시간을 만족하는 유저의 수'로 계산된다.
public class ChatMessageService {
@Transactional(readOnly = true)
public List<MessageRes> getChatMessages(Long chatRoomId) {
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
List<ChatMessage> chatMessages = chatMessageRepository.findByChatRoomIdOrderByCreatedAtAsc(chatRoom.getId());
List<MessageRes> messageResList = chatMessages.stream()
.map(chatMessage -> {
/*
현재 접속 중인 채팅방 유저를 제외하고 나머지 채팅방 유저의 마지막 입장 시간을 가져옴
그 중에서 메시지 생성 시간보다 늦은 시간에 입장한 유저(해당 메시지를 읽지 않았다는 의미)의 수를 세어줌
unreadCnt = 채팅방 유저 수 - 현재 접속 중인 유저 수 - 메시지 생성 시간보다 늦은 시간에 입장한 유저 수
*/
int unreadCnt = calculateUnreadCntAtReadTime(chatRoom.getId(), chatMessage.getCreatedAt());
Member member = entityFacade.getMember(chatMessage.getMemberId());
return ChatMessageRes.createRes(member.getNickname(), chatMessage, unreadCnt);
})
.toList();
return messageResList;
}
private int calculateUnreadCntAtReadTime(Long chatRoomId, LocalDateTime messageCreatedAt) {
List<ChatRoomMember> chatRoomMembers = chatRoomMemberRepository.findAllByChatRoomId(chatRoomId);
Set<Long> onlineChatRoomMembers = redisChatUtil.getOnlineChatRoomMembers(chatRoomId);
List<LocalDateTime> lastEntryTimes = getLastEntryTimesExcludingOnlineMembers(chatRoomMembers, onlineChatRoomMembers);
int memberCntAfterMessageCreated = (int) lastEntryTimes.stream()
.filter(time -> time.isAfter(messageCreatedAt))
.count();
return chatRoomMembers.size() - onlineChatRoomMembers.size() - memberCntAfterMessageCreated;
}
private List<LocalDateTime> getLastEntryTimesExcludingOnlineMembers(List<ChatRoomMember> chatRoomMembers, Set<Long> onlineMemberIds) {
return chatRoomMembers.stream()
.filter(chatRoomMember -> !onlineMemberIds.contains(chatRoomMember.getMember().getId()))
.map(ChatRoomMember::getLastEntryTime)
.toList();
}
}
채팅 테스트를 위해 직접 STOMP.js를 활용하여 페이지를 제작했다. 하는 김에 간단한 API docs도 추가했다.
