이번 포스팅에서는 지난 포스팅에 이어 Redis PUB/SUB과 WebSocket을 이용하여 채팅 서버 구현의 세부적인 과정에 대해 알아보도록 하겠습니다.
지난 포스팅 - [Spring] Redis PUB/SUB + WebSocket을 이용한 채팅 서버 구현하기 - 간단편
사용자와 채팅 서버는 웹소켓이라는 하나의 통로로 연결이 되어있고 사용자는 채팅 관련 다양한 토픽을 구독함으로써 채팅 서버와의 통신이 가능한 구조입니다.
사용자가 구독하는 토픽은 총 5가지가 있습니다.
위의 5가지 토픽을 이용하여 채팅방 생성,채팅방 목록에서 각각 채팅방의 마지막 메시지 확인, 채팅방 안보이게 설정, 채팅방 나가기, 실시간 채팅 전송 및 읽음 여부 확인 등의 로직을 수행하였습니다.
채팅 기능을 구현하기에 앞서 채팅에 대한 사용자 인증 처리에 대해서 살펴보도록 하겠습니다.
HttpHandshakeInterceptor
와 StompHandler
를 통해 채팅 메시지에 대한 사용자의 인증을 처리하였습니다.HttpHandshakeInterceptor
와 StompHandler
에 대한 내용은 아래를 참고해주세요.
@Slf4j
public class HttpHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String clientIp = request.getRemoteAddress().toString();
attributes.put("clientIp", clientIp); // 클라이언트 IP 주소를 세션 속성에 저장
return true; // 인증에 성공하면 true를 반환하여 연결을 계속 진행합니다.
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
// 핸드셰이크 후 처리
}
}
HttpHandshakeInterceptor
는 처음 WebSocket
을 연결하기 전에 HTTP
통신 요청을 가로채서 로직을 작성할 수 있게 해줍니다.
beforeHandshake
메서드에서 해당 요청의 ip 주소를 꺼내어 세션 속성에 저장하고 있습니다. 이후 지나갈 StompHandler
에서 사용자 인증에 대한 로직을 처리하기 위한 세션 속성을 세팅해주는 과정입니다.
@Slf4j
@RequiredArgsConstructor
@Component
public class StompHandler implements ChannelInterceptor {
private final TokenProvider tokenProvider;
private final RefreshTokenProvider refreshTokenProvider;
private final RedisService redisService;
private final HmacAndBase64 hmacAndBase64;
private final ObjectMapper objectMapper;
private SimpMessagingTemplate messagingTemplate;
private final ApplicationContext applicationContext;
private SimpMessagingTemplate getMessagingTemplate() {
if (messagingTemplate == null) {
messagingTemplate = applicationContext.getBean(SimpMessagingTemplate.class);
}
return messagingTemplate;
}
// Websocket을 통해 들어온 요청이 처리 되기전 실행된다.
// preSend 메서드는 메시지가 채널을 통해 실제로 전송되기 전에 호출된다.
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
boolean isAuthenticated = validAuthenticate(accessor);
if (StompCommand.DISCONNECT != accessor.getCommand() && !isAuthenticated) {
String renewToken = null;
try {
renewToken = returnValidRefreshAuthenticate(accessor);
} catch (Exception e) {
log.info("StompHandler Crypt 에러 발생 {} :", e);
} finally {
ErrorResponse errorResponse = ErrorResponse.of(ErrorCode.UNAUTHORIZED.name());
if (renewToken != null) {
try {
String jsonErrorResponse = objectMapper.writeValueAsString(errorResponse);
String destination = "/queue/authorization-error";
// 헤더 설정
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(accessor.getSessionId());
headerAccessor.setLeaveMutable(true);
headerAccessor.setHeader(ConstantPool.AUTHORIZATION_HEADER, renewToken);
messagingTemplate.convertAndSendToUser(accessor.getSessionId(),destination,jsonErrorResponse,headerAccessor.getMessageHeaders());
return null;
} catch (Exception e) {
e.printStackTrace();
return null;
}
} else {
try {
String jsonErrorResponse = objectMapper.writeValueAsString(errorResponse);
String sessionId = accessor.getSessionId();
String destination = "/queue/authorization-error";
messagingTemplate.convertAndSendToUser(sessionId, destination, jsonErrorResponse);
return null;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
} else if (StompCommand.CONNECT == accessor.getCommand()) {
log.info("CONNECT {}");
} else if (StompCommand.SEND == accessor.getCommand()) {
log.info("StompHandler Send 도착");
} else if (StompCommand.SUBSCRIBE == accessor.getCommand()) {
log.info("SUBSCRIBE");
} else if (StompCommand.DISCONNECT == accessor.getCommand()) {
log.info("DISCONNECT");
}
return message;
}
public String resolveToken(String token) {
if (StringUtils.hasText(token) && token.startsWith("Bearer")) {
return token.substring(7);
}
return null;
}
public boolean validAuthenticate(StompHeaderAccessor accessor) {
String token = accessor.getFirstNativeHeader(ConstantPool.AUTHORIZATION_HEADER);
System.out.println("token :" +token);
String jwt = resolveToken(token);
if (StringUtils.hasText(jwt) && tokenProvider.validateToken(jwt)) {
log.info("StompHandler JWT 토큰 인증 성공");
return true;
}
log.info("StompHandler JWT 토큰 인증 실패");
return false;
}
public String returnValidRefreshAuthenticate(StompHeaderAccessor accessor) throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException {
String token = accessor.getFirstNativeHeader(ConstantPool.REFRESH_HEADER);
String refreshToken = resolveToken(token);
String clientIp = (String) accessor.getSessionAttributes().get("clientIp");
log.info("StompHandler clientIp {} :", clientIp);
if (StringUtils.hasText(refreshToken) && refreshTokenProvider.validateToken(refreshToken,clientIp)) {
Authentication authentication = tokenProvider.getAuthentication(refreshToken);
if (redisService.getRefreshToken("refresh:"+
hmacAndBase64.crypt(clientIp,"HmacSHA512")+"_"+authentication.getName()).equals(refreshToken)) {
String renewToken = tokenProvider.createToken(authentication);
return renewToken;
}
}
return null;
}
}
해당 StompHandler
는 WebSocket
을 통해 들어온 요청(웹소켓 연결, 웹소켓 구독, 웹소켓 메시지 전송, 웹소켓 연결 끊기)에 대해서 인증을 처리하게 됩니다.
preSend
메서드에서는 StompHeaderAccessor
를 통해 Authorization
과 Refresh
토큰 및 이전 HttpHandshakeInterceptor
에서 설정해뒀던 clientIp
속성을 꺼내 인증을 처리하고 있습니다.
또한 위에서는 따로 구현하지 않았지만 StompCommand
를 통해 웹소켓 연결(CONNECT
), 웹소켓 메시지 전송(SEND
), 웹소켓 구독(SUBSCRIBE
), 웹소켓 연결끊기(DISCONNECT
)에 대한 분기처리를 할 수 있습니다.
Authorization
토큰이 유효하지 않지만 Refresh
토큰이 유효한 경우 401 에러의 응답 코드에 새로 발급한 Authorization
토큰을 헤더값에 넣어 응답을 전달하게 되고 Authorization
와 Refresh
토큰 모두 유효하지 않은 경우 401 에러 응답을 전달하게 됩니다.
preSend
메서드에서 return null
을 하면 해당 메시지는 채널로 전달되지 않고 메세지는 버려지게 됩니다.
사용자 인증에 실패한 에러 메시지는 /user/queue/authorization-error
토픽을 통해 클라이언트에게 전달하게 됩니다.
참고 :
Spring WebSocket STOMP에서는 특정 사용자에게 메시지를 전송할 때, 내부적으로 "/user/{sessionId}/" prefix가 추가됩니다. 이는 Spring의 STOMP 구현에서 사용자를 특정하여 메시지를 라우팅하기 위한 방식입니다.
이제 채팅 기능에 대해 구현해보도록 하겠습니다.
채팅 관련 메시지 전송은 아래 그림과 같이 이루어집니다.
채팅 서버는 다중 서버이기 때문에 채팅을 하고 있는 두 사용자가 다른 채팅 서버를 통해 통신하게 되는 경우 연결되어 있는 웹소켓 통로도 달라지게 됩니다.
Redis PUB/SUB System
을 활용하여 모든 채팅 서버 Redis
의 채팅 메시지 토픽과 채팅방 관련 토픽을 구독하도록 구현하게 하였습니다.
이를 통해 사용자가 어느 채팅 서버와 웹소켓 연결이 되어있는지와는 상관없이 모든 웹소켓 통로에서 사용자가 구독하고 있는 토픽에 대해 메시지를 전송할 수 있게 되는 것입니다.
채팅 관련 테이블은 채팅, 채팅방, 채팅방 멤버십 세 가지가 있습니다.
그 중 채팅방 멤버십 테이블은 일종의 중간 테이블로 채팅방 보이기 유무 및 채팅방 나가기 여부를 회원마다 체크하는 역할을 합니다.
이제 채팅 관련 기능들을 세분화하여 보도록 하겠습니다.
@RequiredArgsConstructor
@RestController
@Slf4j
@RequestMapping("/api")
public class ChatController {
private final ChatService chatService;
private final SimpMessageSendingOperations messagingTemplate;
/**
* websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
*/
@MessageMapping("/chat/message")
public void message(ChatMessage message) {
log.info("메시지 도착 : {}", message.toString());
// Websocket에 발행된 메시지를 redis로 발행(publish)
try {
chatService.sendChatMessage(message);
} catch (ApplicationException e) {
messagingTemplate.convertAndSend("/sub/error-message/"+message.getMemberId(), new ChatStatusResponse(e.getErrorCode()));
}
}
/**
* websocket "/pub/chat/modify-connection-status"로 들어오는 메시징을 처리한다.
*/
@MessageMapping("/chat/modify-connection-status")
public void modifyConnectionStatus(ConnectionStatusMessage message) {
log.info("메시지 도착 : {}", message.toString());
chatService.modifyConnectionStatus(message);
}
/**
* 채팅방 목록 조회
*/
@GetMapping("/chat/rooms")
public ResponseEntity<List<ChatRoomResponse>> getRoomList(Principal principal) {
Long memberId = Long.parseLong(principal.getName());
List<ChatRoomResponse> roomList = chatService.getRoomList(memberId);
return new ResponseEntity<>(roomList,HttpStatus.OK);
}
/**
* 채팅 목록 조회
*/
// TODO: 채팅 읽음 로직 추가해야한다 또한 채팅방에 있을 시 클라이언트 화면 상에서 채팅 읽음 로직 어떻게 구성할지 생각
// TODO: 상대가 채팅방을 보고 있을 시 채팅 치면 채팅 읽음으로 수정이 되어야함
@GetMapping("/chat/{room-id}")
public ResponseEntity<List<ChatMessageResponse>> getChatMessages(@PathVariable("room-id") Long roomId,
Principal principal) {
Long memberId = Long.parseLong(principal.getName());
List<ChatMessageResponse> chatMessages = chatService.getChatMessages(roomId, memberId);
return new ResponseEntity<>(chatMessages, HttpStatus.OK);
}
/**
* 채팅 삭제
*/
@DeleteMapping("/chat/{chat-id}")
public ResponseEntity deleteChat(@PathVariable("chat-id") Long chatId,
@RequestParam("opponentId") Long opponentId,
Principal principal) {
Long memberId = Long.parseLong(principal.getName());
chatService.deleteChatMessage(memberId, opponentId,chatId);
return new ResponseEntity(new SimpleChatMessageResponse(chatId),HttpStatus.NO_CONTENT);
}
}
@Slf4j
@RequiredArgsConstructor
@Component
public class ChatRoomSubscriber {
private final ObjectMapper objectMapper;
private final ChatService chatService;
private final SimpMessageSendingOperations messagingTemplate;
/**
* Redis에서 메시지가 발행(publish)되면 대기하고 있던 RedisSubscriber가 해당 메시지를 받아 처리함
*/
public void sendMessage(String publishMessage) {
ChatRoomMessage chatRoomMessage = null;
try {
//ChatMessage 객체로 매핑
chatRoomMessage = objectMapper.readValue(publishMessage, ChatRoomMessage.class);
System.out.println("chatRoom Message 도착");
System.out.println(chatRoomMessage.toString());
// 타입(QUIT, UNVISIBLE, CREATE)에 따른 처리
if (chatRoomMessage.getType() == ChatRoomMessage.Type.CREATE) {
// 채팅방 생성 로직(채팅 생성 로직 포함)
ChatMessage message = chatService.createChatRoom(chatRoomMessage);
// 채팅방을 구독한 클라이언트에게 메시지 발송(Redis의 토픽에 메시지 발행 후 작업)
// 채팅 전송 로직
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getReceiverId(),message);
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getSenderId(),message);
} else if (chatRoomMessage.getType() == ChatRoomMessage.Type.UNVISIBLE) {
// 채팅방 안보이게 설정
chatService.unvisibleChatRoom(chatRoomMessage);
// 채팅방을 구독한 클라이언트에게 메시지 발송
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getSenderId(),new ChatStatusResponse(ResponseCode.UNVISIBLE_COMPLETED));
} else if (chatRoomMessage.getType() == ChatRoomMessage.Type.QUIT) {
// 채팅방 나가도록 설정
ChatMessageResponse chatMessageResponse = chatService.quitChatRoom(chatRoomMessage);
if (chatMessageResponse != null) {
// 채팅방 자체에 나가기 메시지 전송
messagingTemplate.convertAndSend("/sub/chat/room/"+chatRoomMessage.getChatRoomId(),chatMessageResponse);
}
// 채팅방을 구독한 클라이언트에게 메시지 발송
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getSenderId(),new ChatStatusResponse(ResponseCode.QUIT_COMPLETED));
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getReceiverId(),new ChatStatusResponse(ResponseCode.QUIT_COMPLETED));
}
} catch (ApplicationException e) {
// TODO: 예외 발생 시 해당 구독자(클라이언트)에게 예외 메시지 보내기 구현
log.error("Exception {}", e);
messagingTemplate.convertAndSend("/sub/error-message/" +chatRoomMessage.getSenderId(), new ChatStatusResponse(e.getErrorCode()));
} catch (Exception e) {
log.error("Exception {}", e);
messagingTemplate.convertAndSend("/sub/error-message/" +chatRoomMessage.getSenderId(), new ChatStatusResponse(ErrorCode.INTERNAL_SERVER_ERROR));
}
}
}
@Slf4j
@RequiredArgsConstructor
@Component
public class ChatMessageSubscriber {
private final ObjectMapper objectMapper;
private final SimpMessageSendingOperations messagingTemplate;
/**
* Redis에서 메시지가 발행(publish)되면 대기하고 있던 RedisSubscriber가 해당 메시지를 받아 처리함
*/
public void sendMessage(String publishMessage) {
try {
//RedisChatMessage 객체로 매핑
RedisChatMessage redisChatMessage = objectMapper.readValue(publishMessage, RedisChatMessage.class);
// 채팅방을 구독한 클라이언트에게 메시지 발송(Redis의 토픽에 메시지 발행 후 작업)
messagingTemplate.convertAndSend("/sub/chat/room/"+redisChatMessage.getRoomId(), redisChatMessage.getChatMessageResponse());
// 채팅 상대 및 본인의 채팅방 목록에게 renew 전달하기
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+redisChatMessage.getReceiverId(),redisChatMessage);
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+redisChatMessage.getChatMessageResponse().getMemberId(),redisChatMessage);
} catch (Exception e) {
log.error("Exception {}", e);
}
}
}
@Slf4j
@RequiredArgsConstructor
@Service
@Transactional
public class ChatService {
private final MemberRepository memberRepository;
private final ChatRoomRepository chatRoomRepository;
private final ChatRepository chatRepository;
private final ChatRoomMembershipRepository chatRoomMembershipRepository;
private final RedisTemplate redisTemplate;
private final RedisService redisService;
private final ChannelTopic chatMessageTopic;
private final SimpMessageSendingOperations messagingTemplate;
/**
* 메시지 생성 및 메시지 전달
*/
public void sendChatMessage(ChatMessage message) throws ApplicationException {
// Todo: 예외 발생 시 어떻게 클라이언트에게 전달해줄 지 결정해야한다.
// if (message.getType() == ChatMessage.Type.TALK) {
// 메시지 저장
Member member = memberRepository.findByMemberId(message.getMemberId()).orElseThrow(
() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
ChatRoom room = chatRoomRepository.findById(message.getRoomId()).orElseThrow(
() -> new ApplicationException(ErrorCode.ROOM_NOT_FOUND));
// 상대 회원 pk 찾기
Member opponentMember = chatRoomMembershipRepository.findOpponentId(room.getId(), message.getMemberId())
.orElseThrow(() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
// 상대방이 채팅방을 나갔는지 확인해야 한다
ChatRoomMembership opponentMembership = chatRoomMembershipRepository.findValidChatRoom(room.getId(), opponentMember.getId(), member.getId()).orElseThrow(() -> new ApplicationException(ErrorCode.OPPONENT_LEFT_OUT));
Chat chat = null;
// 상대방이 현재 채팅방에 접속했는지 확인 후 접속시엔 읽음처리
String connectedRoomId= redisService.getChatUserRoomId(opponentMember.getId());
if (connectedRoomId!=null && connectedRoomId.equals(message.getRoomId().toString())) {
System.out.println("상대방이 현재 채팅방에 접속 중이다");
System.out.println(redisService.getChatUserRoomId(opponentMember.getId()));
chat = Chat.createTalkMessage(member,room, message.getMessage(), true);
} else {
chat = Chat.createTalkMessage(member,room, message.getMessage(), false);
}
chatRepository.save(chat);
// 채팅방 수정(마지막 메시지 UPDATE)
room.modifyLastMessage(chat.getId());
// 채팅방 멤버십에서 상대 유저의 visible을 true로 전환
opponentMembership.changeVisible(true);
// 레디스 구독자들에게 메시지 publish
redisTemplate.convertAndSend(chatMessageTopic.getTopic(), new RedisChatMessage(message.getRoomId(),opponentMember.getId(),new ChatMessageResponse(chat)));
// }
}
/**
* 채팅 내역 불러오기
*/
public List<ChatMessageResponse> getChatMessages(Long roomId,Long memberId) {
// Todo: 예외 처리(웹소켓으로 전송할 필요 없이 ApplicationException 보내주기)
List<ChatMessageResponse> chatList;
try {
// 채팅 읽기 업데이트
updateReadWhenGetChatList(roomId, memberId);
// 채팅 조회
chatList= chatRepository.findChatList(roomId);
} catch (Exception e) {
System.out.println(e);
throw new ApplicationException(ErrorCode.INTERNAL_SERVER_ERROR);
}
return chatList;
}
/**
* 채팅방 목록 불러오기
*/
public List<ChatRoomResponse> getRoomList(Long memberId) {
return chatRoomMembershipRepository.findChatRoomList(memberId);
}
/**
* 채팅방 생성 요청 시 채팅방/멤버십 생성
*/
public ChatMessage createChatRoom(ChatRoomMessage roomMessage) throws ApplicationException,Exception {
// Todo: 해당 채팅방이 존재하는지 DB에서 확인
// Todo: 만약 채팅 서버에서 에러가 발생했을 때 어떻게 클라이언트에게 전달할지 고민해야한다
if (chatRoomRepository.findChatRoom(roomMessage.getJobPostId(), roomMessage.getSenderId()).isEmpty()) {
// 채팅방 생성
ChatRoom room = new ChatRoom(roomMessage.getJobPostId());
chatRoomRepository.save(room);
// 채팅방 멤버십 생성
// Todo: 예외 시 어떻게 처리할지 작성
Member sender = memberRepository.findById(roomMessage.getSenderId()).orElseThrow(() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
Member receiver = memberRepository.findById(roomMessage.getReceiverId()).orElseThrow(() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
ChatRoomMembership senderRoomMembership = new ChatRoomMembership(sender,receiver, room);
ChatRoomMembership receiverRoomMembership = new ChatRoomMembership(receiver,sender, room);
chatRoomMembershipRepository.save(senderRoomMembership);
chatRoomMembershipRepository.save(receiverRoomMembership);
// 채팅 생성
ChatMessage chatMessage = createEnterChatMessage(sender, room);
return chatMessage;
} else {
// Todo: 예외 처리
System.out.println("채팅방 존재");
throw new ApplicationException(ErrorCode.CHATROOM_EXISTS);
}
}
/**
* 채팅방 안보이도록 설정
*/
public void unvisibleChatRoom(ChatRoomMessage roomMessage) throws ApplicationException {
// 실제 유효한 채팅방이 있는지 검증
ChatRoom chatRoom = chatRoomRepository.findChatRoomById(roomMessage.getChatRoomId()).orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_NOT_FOUND));
// 해당 채팅방이 본인과 상대방이 속한 채팅방인지 검증 및 채팅방 멤버십 조회
ChatRoomMembership memberShip = chatRoomMembershipRepository.findValidChatRoom(chatRoom.getId(), roomMessage.getSenderId(), roomMessage.getReceiverId())
.orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_MEMBERSHIP_NOT_FOUND));
// 채팅방 멤버십 수정
memberShip.changeVisible(false);
}
/**
* 채팅방 나가기 설정
*/
public ChatMessageResponse quitChatRoom(ChatRoomMessage roomMessage) throws ApplicationException {
// Todo: 만약 상대방도 나가기가 되어 있을 시 해당 채팅방은 delete해줘야한다
// 실제 유효한 채팅방이 있는지 검증
ChatRoom chatRoom = chatRoomRepository.findChatRoomById(roomMessage.getChatRoomId()).orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_NOT_FOUND));
// 해당 채팅방이 본인과 상대방이 속한 채팅방인지 검증 및 채팅방 멤버십 조회
ChatRoomMembership memberShip = chatRoomMembershipRepository.findValidChatRoom(chatRoom.getId(), roomMessage.getSenderId(), roomMessage.getReceiverId())
.orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_MEMBERSHIP_NOT_FOUND));
// 상대방이 나갔는지부터 체크
ChatRoomMembership opponentShip = chatRoomMembershipRepository.findChatRoom(chatRoom.getId(), roomMessage.getReceiverId(), roomMessage.getSenderId())
.orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_MEMBERSHIP_NOT_FOUND));
// 상대방 나갔을 시 나도 나감 처리해주고 해당 채팅방 delete 하기
if (opponentShip.getLeftAt() != null) {
memberShip.getChatRoom().deleteChatRoom();
// 내 채팅방 멤버십 나감 처리
memberShip.quitChat();
return null;
} else {
// 내 채팅방 멤버십 나감 처리
memberShip.quitChat();
// 상대방이 현재 채팅방에 접속했는지 확인 후 접속시엔 읽음처리
Chat chat;
String connectedRoomId = redisService.getChatUserRoomId(opponentShip.getId());
if (connectedRoomId!=null && connectedRoomId.equals(roomMessage.getChatRoomId().toString())) {
chat = Chat.quitTalkMessage(memberShip.getMember(),memberShip.getChatRoom(), true);
} else {
chat = Chat.quitTalkMessage(memberShip.getMember(),memberShip.getChatRoom(),false);
}
chatRepository.save(chat);
// 마지막 채팅 내역 수정
memberShip.getChatRoom().modifyLastMessage(chat.getId());
ChatMessageResponse chatMessageResponse = new ChatMessageResponse(chat);
return chatMessageResponse;
}
}
/**
* 문의 시(채팅방 생성) 입장 채팅 생성
*/
public ChatMessage createEnterChatMessage(Member member, ChatRoom room) {
Chat enterMessage = Chat.createEnterMessage(member, room);
// 채팅 저장
chatRepository.save(enterMessage);
// 채팅방 마지막 메시지 설정
room.modifyLastMessage(enterMessage.getId());
return new ChatMessage(room.getId(), member.getId(),enterMessage.getContent(), ChatMessage.Type.ENTER);
}
public void updateReadWhenGetChatList(Long roomId, Long memberId) {
chatRepository.updateChatRead(roomId, memberId);
}
/**
* 채팅방 접속 유무 수정 로직
*/
public void modifyConnectionStatus(ConnectionStatusMessage message) {
if (message.getType() == ConnectionStatusMessage.Type.CONNECT) {
// 채팅방 접속 정보를 REDIS에 저장
redisService.setChatUser(message.getMemberId(), message.getChatRoomId());
// 상대에게 해당 멤버가 채팅방에 접속했다고 알려주기(재랜더링 목적)
System.out.println("채팅방 접속 유무에 대한 접속 정보 알려주기");
System.out.println(message.getOpponentId());
messagingTemplate.convertAndSend("/sub/chat/renew/"+message.getOpponentId(),new ChatStatusResponse(ResponseCode.OK));
} else if (message.getType() == ConnectionStatusMessage.Type.DISCONNECT) {
redisService.removeChatUser(message.getMemberId());
}
}
/**
* 채팅 삭제
*/
public void deleteChatMessage(Long memberId, Long opponentId, Long chatId) {
Chat chat = chatRepository.findByChatId(chatId).orElseThrow(() -> new ApplicationException(ErrorCode.CHAT_NOT_FOUND));
if (chat.getDeleteYn()) {
throw new ApplicationException(ErrorCode.ALREADY_DELETED_CHAT);
} else if (chat.getMember().getId()!=memberId) {
throw new ApplicationException(ErrorCode.NOT_CHAT_OWNER);
}
chat.deleteChat();
// 삭제 된 이후 클라이언트에게 알려 메시지 삭제자는 채팅방 목록을, 상대방은 채팅방 목록과 채팅 목록 모두 갱신시켜줘야한다
// 메시지 삭제자 갱신
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+memberId,new ChatStatusResponse(ResponseCode.OK));
// 상대방 갱신
messagingTemplate.convertAndSend("/sub/chat-room/renew/"+opponentId,new ChatStatusResponse(ResponseCode.OK));
messagingTemplate.convertAndSend("/sub/chat/renew/"+opponentId,new ChatStatusResponse(ResponseCode.OK));
}
}
public interface ChatRepository extends JpaRepository<Chat, Long> {
@Query("select new com.chatServer.chat.dto.response.ChatMessageResponse(c.id,c.member.id,c.createdAt,c.content,c.readYn,c.deleteYn) " +
"from Chat c where c.chatRoom.id = :roomId and c.chatRoom.deleteAt is null")
List<ChatMessageResponse> findChatList(@Param("roomId") Long roomId);
@Modifying
@Query("delete from Chat c where c.chatRoom.id IN :rooms")
void deleteChatByScheduling(@Param("rooms") List<Long> rooms);
@Modifying
@Query("update Chat c set c.readYn=true where c.chatRoom.id= :roomId and c.member.id <> :memberId")
void updateChatRead(@Param("roomId") Long roomId, @Param("memberId") Long memberId);
@Query("select c from Chat c join fetch c.member where c.id = :chatId")
Optional<Chat> findByChatId(@Param("chatId") Long chatId);
}
public interface ChatRoomRepository extends JpaRepository<ChatRoom, Long> {
@Query("select cr from ChatRoom cr join ChatRoomMembership crm on crm.member.id = :senderId where cr.jobPostId = :jobPostId and cr.deleteAt IS NULL")
Optional<ChatRoom> findChatRoom(@Param("jobPostId") Long jobPostId, @Param("senderId") Long senderId);
@Query("select cr from ChatRoom cr where cr.id = :roomId and cr.deleteAt is null")
Optional<ChatRoom> findChatRoomById(@Param("roomId") Long roomId);
@Query(value = "select cr.room_id from chatroom cr where cr.delete_at <= DATE_SUB(CONVERT_TZ(now(),'+00:00', '+09:00'), INTERVAL 6 MONTH)", nativeQuery = true)
List<Long> findChatRoomToDelete();
@Modifying
@Query("delete from ChatRoom cr where cr.id IN :rooms")
void deleteChatRoomsByScheduling(@Param("rooms") List<Long> rooms);
}
public interface ChatRoomMembershipRepository extends JpaRepository<ChatRoomMembership, Long> {
@Query("select new com.chatServer.chat.dto.response.ChatRoomResponse(crms.chatRoom.id,opponent.member.id, opponent.nickName, opponent.profileImage, c.content,c.deleteYn, c.readYn,c.member.id, crms2.leftAt, crms.chatRoom.jobPostId) from ChatRoomMembership crms " +
"join ChatRoomMembership crms2 on crms2.chatRoom.id=crms.chatRoom.id and crms2.opponent.id = :memberId " +
"join MemberProfileInfo opponent " +
"on crms.opponent.id = opponent.member.id and opponent.deletedYn = false " +
"join Chat c on c.id=crms.chatRoom.lastChatId " +
"where crms.member.id = :memberId and crms.isVisible = true and crms.leftAt is null and crms.chatRoom.deleteAt is null "+
"order by c.createdAt desc"
)
List<ChatRoomResponse> findChatRoomList(@Param("memberId") Long memberId);
@Query("select rm.opponent from ChatRoomMembership rm " +
"where rm.chatRoom.id = :roomId " +
"and rm.member.id = :memberId " +
"and rm.leftAt is null " +
"and rm.chatRoom.deleteAt is null")
Optional<Member> findOpponentId(@Param("roomId") Long roomId, @Param("memberId") Long memberId);
@Query("select rm from ChatRoomMembership rm " +
"where rm.chatRoom.id = :roomId " +
"and rm.member.id = :memberId " +
"and rm.opponent.id = :opponentId " +
"and rm.leftAt is null")
Optional<ChatRoomMembership> findValidChatRoom(@Param("roomId") Long roomId, @Param("memberId") Long memberId, @Param("opponentId") Long opponentId);
@Query("select rm from ChatRoomMembership rm " +
"where rm.chatRoom.id = :roomId " +
"and rm.member.id = :memberId " +
"and rm.opponent.id = :opponentId ")
Optional<ChatRoomMembership> findChatRoom(@Param("roomId") Long roomId, @Param("memberId") Long memberId, @Param("opponentId") Long opponentId);
@Modifying
@Query("delete from ChatRoomMembership rm where rm.chatRoom.id IN :rooms")
void deleteMemberShipByScheduling(@Param("rooms") List<Long> rooms);
}
@Entity
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Chat {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name="chat_id")
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(updatable = false,name="member_id")
private Member member;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(updatable = false,name="room_id")
private ChatRoom chatRoom;
private String content;
@Column(name="read_yn")
private Boolean readYn;
@Column(name="delete_yn")
private Boolean deleteYn;
@Column(updatable = false, name="created_at")
private LocalDateTime createdAt;
@PrePersist
public void prePersist() {
this.createdAt = LocalDateTime.now();
}
public static Chat createEnterMessage(Member member,ChatRoom room) {
return Chat.builder()
.member(member)
.chatRoom(room)
.content("1:1 문의하기 입장하셨습니다.")
.readYn(false)
.deleteYn(false)
.build();
}
public static Chat createTalkMessage(Member member, ChatRoom room,String content,boolean readYn) {
return Chat.builder()
.member(member)
.chatRoom(room)
.content(content)
.readYn(readYn)
.deleteYn(false)
.build();
}
public static Chat quitTalkMessage(Member member, ChatRoom room, boolean readYn) {
return Chat.builder()
.member(member)
.chatRoom(room)
.content("채팅방을 나가셨습니다.")
.readYn(readYn)
.deleteYn(false)
.build();
}
public void deleteChat() {
this.deleteYn = true;
}
}
@Entity
@Getter
@NoArgsConstructor
@Table(name="chatroom")
public class ChatRoom {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name="room_id")
private Long id;
@Column(name="jobpost_id")
private Long jobPostId;
@Column(name="created_at")
private LocalDateTime createdAt;
@Column(name="last_chat_id")
private Long lastChatId;
@Column(name="delete_at")
private LocalDateTime deleteAt;
public ChatRoom(Long jobPostId) {
this.jobPostId = jobPostId;
}
@PrePersist
public void prePersist() {
this.createdAt = LocalDateTime.now();
this.deleteAt = null;
}
public void modifyLastMessage(Long lastChatId) {
this.lastChatId = lastChatId;
}
public void deleteChatRoom() {
this.deleteAt = LocalDateTime.now();
}
}
@Entity
@NoArgsConstructor
@Table(name="room_membership")
@Getter
public class ChatRoomMembership {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name="membership_id")
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(updatable = false,name="member_id")
private Member member;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(updatable = false,name="opponent_id")
private Member opponent;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(updatable = false,name="room_id")
private ChatRoom chatRoom;
@Column(name="visible")
private Boolean isVisible;
@Column(name="left_at")
private LocalDateTime leftAt;
public ChatRoomMembership(Member member, Member opponent, ChatRoom chatRoom) {
this.member = member;
this.opponent = opponent;
this.chatRoom = chatRoom;
this.isVisible = true;
}
public void changeVisible(Boolean isVisible) {
this.isVisible = isVisible;
}
public void quitChat() {
this.leftAt = LocalDateTime.now();
}
}
그 외에 DTO들은 생략하도록 하겠습니다.
채팅방 생성은 채팅 서버가 아닌 메인 서버에서 요청을 받아 Redis PUB/SUB System
을 통해 채팅 서버로 전달하게 됩니다. 이전 포스팅에서 설명드렸던 내용이니 코드는 생략하도록 하겠습니다.
채팅 서버에서는 ChatRoomSubscriber를 통해 Redis PUB/SUB System
에서 chatRoom
으로 발행되는 메시지를 수신할 수 있습니다.
ChatRoomSubscriber에서는 발행된 메시지를 수신하여 메시지의 타입을 검사하게 됩니다. CREATE
타입일 경우 ChatService의 createChatRoom
메서드가 호출되어 관련 유효성 검사를 진행 후 채팅방과 채팅방 멤버십 엔티티를 생성하여 DB에 저장하고 createEnterChatMessage
메서드를 호출하여 채팅 엔티티를 생성하여 DB에 저장하고 채팅방의 마지막 메시지를 생성된 채팅으로 수정해주는 작업을 하게 됩니다.
이후 웹소켓을 통해 /sub/chat-room/renew/{user-id}
주소로 생성된 채팅을 보내게 됩니다. 채팅은 일대일 채팅이므로 발신자와 수신자 두명에게 메시지를 전달하게 됩니다.
클라이언트에서는 해당 메시지를 전달받아 재랜더링을 해주어 화면을 재구성하게 됩니다.
채팅방 안보이게 설정하는 요청 또한 메인 서버에서 요청을 받아 Redis PUB/SUB System
을 통해 채팅 서버로 전달하게 됩니다.
채팅 서버에서 ChatRoomSubscriber를 통해 Redis PUB/SUB System
에서 chatRoom
으로 발행되는 메시지를 수신할 수 있습니다.
ChatRoomSubscriber에서는 발행된 메시지를 수신하여 메시지의 타입을 검사하게 되는데 UNIVISIBLE
타입일 경우 ChatService의 unvisibleChatRoom
메서드를 호출하게 됩니다. 해당 메서드에서는 실제 유효한 채팅방이 있는지에 대한 검증하고 해당 채팅방이 본인과 상대방이 속한 채팅방인지 검증 및 채팅방 멤버십을 조회하고 해당 채팅방 멤버십 상태를 unvisible
로 수정합니다.
이후 웹소켓을 통해 /sub/chat-room/renew/{user-id}
주소(채팅방 안보이게 설정을 요청한 사용자에게만)로 완료 처리됐다는 메시지를 전송하게 됩니다.
클라이언트에서는 해당 메시지를 전달받아 재랜더링을 해주어 화면을 재구성하게 됩니다.
채팅방 안보이게 설정하는 요청 또한 메인 서버에서 요청을 받아 Redis PUB/SUB System
을 통해 채팅 서버로 전달하게 됩니다.
ChatRoomSubscriber에서는 발행된 메시지를 수신하여 메시지의 타입 검사를 하고 QUIT
타입일 경우 ChatService의 quitChatRoom
메서드를 호출하여 채팅방 나가기 로직을 수행합니다.
quitChatRoom
메서드에서는 실제 유효한 채팅방이 있는지, 해당 채팅방 본인과 상대방이 속한 채팅방인지 검증을 하고 채팅방 멤버십을 조회하여 상대방이 나간 상태인지에 대해 체크하게 됩니다.
상대방이 나갔을 시 본인도 나감 처리를 해주고 해당 채팅방을 삭제해주게 됩니다. 이후 본인의 채팅방 멤버십을 나감 처리하고 null
을 return하게 됩니다.
만약 상대방이 나가지 않았을 시에는 상대방이 현재 채팅방에 접속해있는지를 확인한 후 읽음 처리를 하게 됩니다. 마지막으로 채팅 내역을 수정하고 해당 채팅
을 return 하게 됩니다.
만약 quitChatRoom
메서드가 반환한 값이 null이 아닐 시 상대방이 나가지 않은 상태이기 때문에 상대방에게 내가 채팅방을 나갔다는 채팅을 전송하기 위해 /sub/chat/room/"+chatRoomMessage.getChatRoomId()
주소로 메시지를 전송합니다.
그리고 채팅방 목록을 갱신해주기 위해 두 사용자에게 /sub/chat-room/renew/{user-id}
주소로 채팅방 나가기 완료 메시지를 전송하게 됩니다.
클라이언트에서는 해당 메시지를 전달받아 재랜더링을 해주어 화면을 재구성하게 됩니다.
실시간 채팅은 클라이언트가 ws.send("/pub/chat/message",{"Authorization":getCookie("Authorization")},JSON.stringify(message))
코드를 통해서 전송하게 됩니다. ChatController의 message
메서드는 /pub/chat/message
주소로 오는 웹소켓 메시지를 받아서 처리하게 됩니다.
이후 ChatService의 sendChatMessage
를 호출하게
되고 내부에서 사용자와 채팅방에 대한 유효성 검사를 진행한 후 상대방이 채팅방을 체크하고 나갔을 시 ErrorCode.OPPONENT_LEFT_OUT
에러코드를 발생시켜 에러 메시지를 클라이언트에게 전달하게 됩니다.
상대방이 채팅방을 나가지 않았을 시 정상적으로 채팅을 생성하게 되는데, Redis
에서 채팅 접속 유무에 대한 정보를 확인한 후 상대방이 채팅방에 접속중일 시 생성하는 채팅 엔티티의 readYn
상태를 true
로, 접속중이 아닐 시 false
로 지정하여 저장해줍니다.
그리고 채팅방의 마지막 메시지를 수정해주고, 만약 상대가 채팅방 안보이게 설정한 상태일 시 보이도록 하기 위해 채팅방 멤버십에서 상대 유저의 unvisible
상태를 visible
로 수정해줍니다.
그리고 Redis PUB/SUB System
을 통해 생성한 채팅 정보를 발행하게 됩니다.
Redis PUB/SUB System
에 발행된 채팅 정보 메시지는 다른 채팅 서버들이 ChatMessageSubscriber의 sendMessage
에서 수신하여 처리하게 됩니다.
해당 채팅방을 구독한 사용자에게 채팅 메시지를 전달하고 채팅 상대 및 본인의 채팅방 목록을 갱신하기 위해 웹소켓 통신을 보냄으로써 마무리가 됩니다.
Redis에서 다음 형태로 chatUser를 관리합니다.
해당 정보들은 클라이언트가 채팅방을 클릭할 시 ws.send("/pub/chat/modify-connection-status",{"Authorization":getCookie("Authorization")},JSON.stringify(message))
코드가 호출되어 ChatController의 modifyConnectionStatus
메서드가 웹소켓 메시지를 처리하게 됩니다.
ChatService
의 modifyConnectionStatus
메서드는 아래와 같습니다.
/**
* 채팅방 접속 유무 수정 로직
*/
public void modifyConnectionStatus(ConnectionStatusMessage message) {
if (message.getType() == ConnectionStatusMessage.Type.CONNECT) {
// 채팅방 접속 정보를 REDIS에 저장
redisService.setChatUser(message.getMemberId(), message.getChatRoomId());
// 상대에게 해당 멤버가 채팅방에 접속했다고 알려주기(재랜더링 목적)
System.out.println("채팅방 접속 유무에 대한 접속 정보 알려주기");
System.out.println(message.getOpponentId());
messagingTemplate.convertAndSend("/sub/chat/renew/"+message.getOpponentId(),new ChatStatusResponse(ResponseCode.OK));
} else if (message.getType() == ConnectionStatusMessage.Type.DISCONNECT) {
redisService.removeChatUser(message.getMemberId());
}
}
modifyConnectionStatus
메서드에서 CONNECT
, DISCONNECT
의 타입에 따라 채팅방 접속 정보를 갱신하게 됩니다.
긴 내용 포스팅을 하였는데 읽으시느라 고생 많으셨습니다. 다음에는 더 유익한 내용으로 뵙겠습니다.
좋은 글 감사합니다:)