[Spring] Redis PUB/SUB + WebSocket을 이용한 채팅 서버 구현하기 - 세부편

김강욱·2024년 5월 27일
0

Project-DoggyWalky

목록 보기
2/5
post-thumbnail

이번 포스팅에서는 지난 포스팅에 이어 Redis PUB/SUB과 WebSocket을 이용하여 채팅 서버 구현의 세부적인 과정에 대해 알아보도록 하겠습니다.

지난 포스팅 - [Spring] Redis PUB/SUB + WebSocket을 이용한 채팅 서버 구현하기 - 간단편


사용자와 채팅 서버는 웹소켓이라는 하나의 통로로 연결이 되어있고 사용자는 채팅 관련 다양한 토픽을 구독함으로써 채팅 서버와의 통신이 가능한 구조입니다.

사용자가 구독하는 토픽은 총 5가지가 있습니다.

  1. 채팅 메시지 전달용 토픽
  2. 채팅방 목록에서 마지막 메시지 수정용 토픽
  3. 채팅방 목록에서 새로운 채팅방 생성용 토픽
  4. 비즈니스 로직 처리 중 발생한 에러 전달용 토픽
  5. 사용자 인증 처리 중 발생한 에러 전달용 토픽

위의 5가지 토픽을 이용하여 채팅방 생성,채팅방 목록에서 각각 채팅방의 마지막 메시지 확인, 채팅방 안보이게 설정, 채팅방 나가기, 실시간 채팅 전송 및 읽음 여부 확인 등의 로직을 수행하였습니다.



😁 인증 관련 처리

채팅 기능을 구현하기에 앞서 채팅에 대한 사용자 인증 처리에 대해서 살펴보도록 하겠습니다.

HttpHandshakeInterceptorStompHandler를 통해 채팅 메시지에 대한 사용자의 인증을 처리하였습니다.HttpHandshakeInterceptorStompHandler에 대한 내용은 아래를 참고해주세요.

[Spring] WebSocket으로 채팅 구현하기 - 인터셉터 설정 및 인증 처리


✔️ HttpHandshakeInterceptor

@Slf4j
public class HttpHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        String clientIp = request.getRemoteAddress().toString();
        attributes.put("clientIp", clientIp); // 클라이언트 IP 주소를 세션 속성에 저장
        return true; // 인증에 성공하면 true를 반환하여 연결을 계속 진행합니다.
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        // 핸드셰이크 후 처리
    }

}

HttpHandshakeInterceptor는 처음 WebSocket을 연결하기 전에 HTTP 통신 요청을 가로채서 로직을 작성할 수 있게 해줍니다.

beforeHandshake 메서드에서 해당 요청의 ip 주소를 꺼내어 세션 속성에 저장하고 있습니다. 이후 지나갈 StompHandler에서 사용자 인증에 대한 로직을 처리하기 위한 세션 속성을 세팅해주는 과정입니다.


✔️ StompHandler

@Slf4j
@RequiredArgsConstructor
@Component
public class StompHandler implements ChannelInterceptor {

    private final TokenProvider tokenProvider;

    private final RefreshTokenProvider refreshTokenProvider;

    private final RedisService redisService;

    private final HmacAndBase64 hmacAndBase64;

    private final ObjectMapper objectMapper;

    private SimpMessagingTemplate messagingTemplate;

    private final ApplicationContext applicationContext;

    private SimpMessagingTemplate getMessagingTemplate() {
        if (messagingTemplate == null) {
            messagingTemplate = applicationContext.getBean(SimpMessagingTemplate.class);
        }
        return messagingTemplate;
    }



    // Websocket을 통해 들어온 요청이 처리 되기전 실행된다.
    // preSend 메서드는 메시지가 채널을 통해 실제로 전송되기 전에 호출된다.
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        boolean isAuthenticated = validAuthenticate(accessor);
        if (StompCommand.DISCONNECT != accessor.getCommand() && !isAuthenticated) {
            String renewToken = null;
            try {
                renewToken = returnValidRefreshAuthenticate(accessor);
            } catch (Exception e) {
                log.info("StompHandler Crypt 에러 발생 {} :", e);
            } finally {
                ErrorResponse errorResponse = ErrorResponse.of(ErrorCode.UNAUTHORIZED.name());
                if (renewToken != null) {
                    try {
                        String jsonErrorResponse = objectMapper.writeValueAsString(errorResponse);
                        String destination = "/queue/authorization-error";

                        // 헤더 설정
                        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
                        headerAccessor.setSessionId(accessor.getSessionId());
                        headerAccessor.setLeaveMutable(true);
                        headerAccessor.setHeader(ConstantPool.AUTHORIZATION_HEADER, renewToken);

                        messagingTemplate.convertAndSendToUser(accessor.getSessionId(),destination,jsonErrorResponse,headerAccessor.getMessageHeaders());
                        return null;
                    } catch (Exception e) {
                        e.printStackTrace();
                        return null;
                    }
                } else {
                    try {
                        String jsonErrorResponse = objectMapper.writeValueAsString(errorResponse);
                        String sessionId = accessor.getSessionId();
                        String destination = "/queue/authorization-error";

                        messagingTemplate.convertAndSendToUser(sessionId, destination, jsonErrorResponse);
                        return null;
                    } catch (Exception e) {
                        e.printStackTrace();
                        return null;
                    }
                }
            }

        } else if (StompCommand.CONNECT == accessor.getCommand()) {
            log.info("CONNECT {}");
        } else if (StompCommand.SEND == accessor.getCommand()) {
            log.info("StompHandler Send 도착");
        } else if (StompCommand.SUBSCRIBE == accessor.getCommand()) {
            log.info("SUBSCRIBE");
        } else if (StompCommand.DISCONNECT == accessor.getCommand()) {
            log.info("DISCONNECT");
        }


        return message;
    }

    public String resolveToken(String token) {
        if (StringUtils.hasText(token) && token.startsWith("Bearer")) {
            return token.substring(7);
        }
        return null;
    }

    public boolean validAuthenticate(StompHeaderAccessor accessor) {
        String token = accessor.getFirstNativeHeader(ConstantPool.AUTHORIZATION_HEADER);
        System.out.println("token :" +token);
        String jwt = resolveToken(token);

        if (StringUtils.hasText(jwt) && tokenProvider.validateToken(jwt)) {
            log.info("StompHandler JWT 토큰 인증 성공");
            return true;
        }
        log.info("StompHandler JWT 토큰 인증 실패");
        return false;
    }

    public String returnValidRefreshAuthenticate(StompHeaderAccessor accessor) throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException {
        String token = accessor.getFirstNativeHeader(ConstantPool.REFRESH_HEADER);
        String refreshToken = resolveToken(token);
        String clientIp = (String) accessor.getSessionAttributes().get("clientIp");

        log.info("StompHandler clientIp {} :", clientIp);


        if (StringUtils.hasText(refreshToken) && refreshTokenProvider.validateToken(refreshToken,clientIp)) {
            Authentication authentication = tokenProvider.getAuthentication(refreshToken);

            if (redisService.getRefreshToken("refresh:"+
                    hmacAndBase64.crypt(clientIp,"HmacSHA512")+"_"+authentication.getName()).equals(refreshToken)) {

                String renewToken = tokenProvider.createToken(authentication);
                return renewToken;
            }

        }
        return null;

    }

}

해당 StompHandlerWebSocket을 통해 들어온 요청(웹소켓 연결, 웹소켓 구독, 웹소켓 메시지 전송, 웹소켓 연결 끊기)에 대해서 인증을 처리하게 됩니다.

preSend 메서드에서는 StompHeaderAccessor를 통해 AuthorizationRefresh 토큰 및 이전 HttpHandshakeInterceptor에서 설정해뒀던 clientIp 속성을 꺼내 인증을 처리하고 있습니다.

또한 위에서는 따로 구현하지 않았지만 StompCommand를 통해 웹소켓 연결(CONNECT), 웹소켓 메시지 전송(SEND), 웹소켓 구독(SUBSCRIBE), 웹소켓 연결끊기(DISCONNECT)에 대한 분기처리를 할 수 있습니다.

Authorization 토큰이 유효하지 않지만 Refresh 토큰이 유효한 경우 401 에러의 응답 코드에 새로 발급한 Authorization 토큰을 헤더값에 넣어 응답을 전달하게 되고 AuthorizationRefresh 토큰 모두 유효하지 않은 경우 401 에러 응답을 전달하게 됩니다.

preSend 메서드에서 return null을 하면 해당 메시지는 채널로 전달되지 않고 메세지는 버려지게 됩니다.

사용자 인증에 실패한 에러 메시지는 /user/queue/authorization-error 토픽을 통해 클라이언트에게 전달하게 됩니다.

참고 :
Spring WebSocket STOMP에서는 특정 사용자에게 메시지를 전송할 때, 내부적으로 "/user/{sessionId}/" prefix가 추가됩니다. 이는 Spring의 STOMP 구현에서 사용자를 특정하여 메시지를 라우팅하기 위한 방식입니다.



😁 채팅 기능 구현

이제 채팅 기능에 대해 구현해보도록 하겠습니다.

채팅 관련 메시지 전송은 아래 그림과 같이 이루어집니다.

채팅 서버는 다중 서버이기 때문에 채팅을 하고 있는 두 사용자가 다른 채팅 서버를 통해 통신하게 되는 경우 연결되어 있는 웹소켓 통로도 달라지게 됩니다.

Redis PUB/SUB System을 활용하여 모든 채팅 서버 Redis의 채팅 메시지 토픽과 채팅방 관련 토픽을 구독하도록 구현하게 하였습니다.

이를 통해 사용자가 어느 채팅 서버와 웹소켓 연결이 되어있는지와는 상관없이 모든 웹소켓 통로에서 사용자가 구독하고 있는 토픽에 대해 메시지를 전송할 수 있게 되는 것입니다.


✏️ 채팅 ERD 구조

채팅 관련 테이블은 채팅, 채팅방, 채팅방 멤버십 세 가지가 있습니다.
그 중 채팅방 멤버십 테이블은 일종의 중간 테이블로 채팅방 보이기 유무 및 채팅방 나가기 여부를 회원마다 체크하는 역할을 합니다.

이제 채팅 관련 기능들을 세분화하여 보도록 하겠습니다.

✏️ 채팅 관련 클래스

ChatController

@RequiredArgsConstructor
@RestController
@Slf4j
@RequestMapping("/api")
public class ChatController {

    private final ChatService chatService;

    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/message")
    public void message(ChatMessage message) {
        log.info("메시지 도착 : {}", message.toString());
        // Websocket에 발행된 메시지를 redis로 발행(publish)
        try {
            chatService.sendChatMessage(message);
        } catch (ApplicationException e) {
            messagingTemplate.convertAndSend("/sub/error-message/"+message.getMemberId(), new ChatStatusResponse(e.getErrorCode()));
        }
    }

    /**
     * websocket "/pub/chat/modify-connection-status"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/modify-connection-status")
    public void modifyConnectionStatus(ConnectionStatusMessage message) {
        log.info("메시지 도착 : {}", message.toString());
        chatService.modifyConnectionStatus(message);
    }

    /**
     * 채팅방 목록 조회
     */
    @GetMapping("/chat/rooms")
    public ResponseEntity<List<ChatRoomResponse>> getRoomList(Principal principal) {
        Long memberId = Long.parseLong(principal.getName());

        List<ChatRoomResponse> roomList = chatService.getRoomList(memberId);
        return new ResponseEntity<>(roomList,HttpStatus.OK);
    }

    /**
     * 채팅 목록 조회
     */
    // TODO: 채팅 읽음 로직 추가해야한다 또한 채팅방에 있을 시 클라이언트 화면 상에서 채팅 읽음 로직 어떻게 구성할지 생각
    // TODO: 상대가 채팅방을 보고 있을 시 채팅 치면 채팅 읽음으로 수정이 되어야함
    @GetMapping("/chat/{room-id}")
    public ResponseEntity<List<ChatMessageResponse>> getChatMessages(@PathVariable("room-id") Long roomId,
                                                                     Principal principal) {
        Long memberId = Long.parseLong(principal.getName());
        List<ChatMessageResponse> chatMessages = chatService.getChatMessages(roomId, memberId);
        return new ResponseEntity<>(chatMessages, HttpStatus.OK);
    }

    /**
     * 채팅 삭제
     */
    @DeleteMapping("/chat/{chat-id}")
    public ResponseEntity deleteChat(@PathVariable("chat-id") Long chatId,
                                     @RequestParam("opponentId") Long opponentId,
                                     Principal principal) {
        Long memberId = Long.parseLong(principal.getName());
        chatService.deleteChatMessage(memberId, opponentId,chatId);
        return new ResponseEntity(new SimpleChatMessageResponse(chatId),HttpStatus.NO_CONTENT);
    }

}

ChatRoomSubscriber

@Slf4j
@RequiredArgsConstructor
@Component
public class ChatRoomSubscriber {

    private final ObjectMapper objectMapper;

    private final ChatService chatService;

    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메시지가 발행(publish)되면 대기하고 있던 RedisSubscriber가 해당 메시지를 받아 처리함
     */
    public void sendMessage(String publishMessage) {
        ChatRoomMessage chatRoomMessage = null;
        try {
            //ChatMessage 객체로 매핑
            chatRoomMessage = objectMapper.readValue(publishMessage, ChatRoomMessage.class);
            System.out.println("chatRoom Message 도착");
            System.out.println(chatRoomMessage.toString());

            // 타입(QUIT, UNVISIBLE, CREATE)에 따른 처리
            if (chatRoomMessage.getType() == ChatRoomMessage.Type.CREATE) {
                // 채팅방 생성 로직(채팅 생성 로직 포함)
                ChatMessage message = chatService.createChatRoom(chatRoomMessage);

                // 채팅방을 구독한 클라이언트에게 메시지 발송(Redis의 토픽에 메시지 발행 후 작업)
                // 채팅 전송 로직
                messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getReceiverId(),message);
                messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getSenderId(),message);
            } else if (chatRoomMessage.getType() == ChatRoomMessage.Type.UNVISIBLE) {
                // 채팅방 안보이게 설정
                chatService.unvisibleChatRoom(chatRoomMessage);

                // 채팅방을 구독한 클라이언트에게 메시지 발송
                messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getSenderId(),new ChatStatusResponse(ResponseCode.UNVISIBLE_COMPLETED));
            } else if (chatRoomMessage.getType() == ChatRoomMessage.Type.QUIT) {
                // 채팅방 나가도록 설정
                ChatMessageResponse chatMessageResponse = chatService.quitChatRoom(chatRoomMessage);

                if (chatMessageResponse != null) {
                    // 채팅방 자체에 나가기 메시지 전송
                    messagingTemplate.convertAndSend("/sub/chat/room/"+chatRoomMessage.getChatRoomId(),chatMessageResponse);
                }

                // 채팅방을 구독한 클라이언트에게 메시지 발송
                messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getSenderId(),new ChatStatusResponse(ResponseCode.QUIT_COMPLETED));
                messagingTemplate.convertAndSend("/sub/chat-room/renew/"+chatRoomMessage.getReceiverId(),new ChatStatusResponse(ResponseCode.QUIT_COMPLETED));
            }


        } catch (ApplicationException e) {
            // TODO: 예외 발생 시 해당 구독자(클라이언트)에게 예외 메시지 보내기 구현
            log.error("Exception {}", e);
            messagingTemplate.convertAndSend("/sub/error-message/" +chatRoomMessage.getSenderId(), new ChatStatusResponse(e.getErrorCode()));
        } catch (Exception e) {
            log.error("Exception {}", e);
            messagingTemplate.convertAndSend("/sub/error-message/" +chatRoomMessage.getSenderId(), new ChatStatusResponse(ErrorCode.INTERNAL_SERVER_ERROR));
        }
    }
}

ChatMessageSubscriber

@Slf4j
@RequiredArgsConstructor
@Component
public class ChatMessageSubscriber {

    private final ObjectMapper objectMapper;
    private final SimpMessageSendingOperations messagingTemplate;


    /**
     * Redis에서 메시지가 발행(publish)되면 대기하고 있던 RedisSubscriber가 해당 메시지를 받아 처리함
     */
    public void sendMessage(String publishMessage) {
        try {
            //RedisChatMessage 객체로 매핑
            RedisChatMessage redisChatMessage = objectMapper.readValue(publishMessage, RedisChatMessage.class);

            // 채팅방을 구독한 클라이언트에게 메시지 발송(Redis의 토픽에 메시지 발행 후 작업)
            messagingTemplate.convertAndSend("/sub/chat/room/"+redisChatMessage.getRoomId(), redisChatMessage.getChatMessageResponse());

            // 채팅 상대 및 본인의 채팅방 목록에게 renew 전달하기
            messagingTemplate.convertAndSend("/sub/chat-room/renew/"+redisChatMessage.getReceiverId(),redisChatMessage);
            messagingTemplate.convertAndSend("/sub/chat-room/renew/"+redisChatMessage.getChatMessageResponse().getMemberId(),redisChatMessage);
        } catch (Exception e) {
            log.error("Exception {}", e);
        }
    }
}

ChatService

@Slf4j
@RequiredArgsConstructor
@Service
@Transactional
public class ChatService {

    private final MemberRepository memberRepository;
    private final ChatRoomRepository chatRoomRepository;

    private final ChatRepository chatRepository;

    private final ChatRoomMembershipRepository chatRoomMembershipRepository;

    private final RedisTemplate redisTemplate;

    private final RedisService redisService;

    private final ChannelTopic chatMessageTopic;

    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * 메시지 생성 및 메시지 전달
     */
    public void sendChatMessage(ChatMessage message) throws ApplicationException {

        // Todo: 예외 발생 시 어떻게 클라이언트에게 전달해줄 지 결정해야한다.
//        if (message.getType() == ChatMessage.Type.TALK) {
            // 메시지 저장
            Member member = memberRepository.findByMemberId(message.getMemberId()).orElseThrow(
                    () -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
            ChatRoom room = chatRoomRepository.findById(message.getRoomId()).orElseThrow(
                    () -> new ApplicationException(ErrorCode.ROOM_NOT_FOUND));


            // 상대 회원 pk 찾기
            Member opponentMember = chatRoomMembershipRepository.findOpponentId(room.getId(), message.getMemberId())
                    .orElseThrow(() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));

            // 상대방이 채팅방을 나갔는지 확인해야 한다
            ChatRoomMembership opponentMembership = chatRoomMembershipRepository.findValidChatRoom(room.getId(), opponentMember.getId(), member.getId()).orElseThrow(() -> new ApplicationException(ErrorCode.OPPONENT_LEFT_OUT));

            Chat chat = null;
            // 상대방이 현재 채팅방에 접속했는지 확인 후 접속시엔 읽음처리
            String connectedRoomId= redisService.getChatUserRoomId(opponentMember.getId());
            if (connectedRoomId!=null && connectedRoomId.equals(message.getRoomId().toString())) {
                System.out.println("상대방이 현재 채팅방에 접속 중이다");
                System.out.println(redisService.getChatUserRoomId(opponentMember.getId()));
                chat = Chat.createTalkMessage(member,room, message.getMessage(), true);
            } else {
                chat = Chat.createTalkMessage(member,room, message.getMessage(), false);
            }
            chatRepository.save(chat);

            // 채팅방 수정(마지막 메시지 UPDATE)
            room.modifyLastMessage(chat.getId());

            // 채팅방 멤버십에서 상대 유저의 visible을 true로 전환
            opponentMembership.changeVisible(true);


            // 레디스 구독자들에게 메시지 publish
            redisTemplate.convertAndSend(chatMessageTopic.getTopic(), new RedisChatMessage(message.getRoomId(),opponentMember.getId(),new ChatMessageResponse(chat)));
//        }

    }



    /**
     * 채팅 내역 불러오기
     */
    public List<ChatMessageResponse> getChatMessages(Long roomId,Long memberId) {
        // Todo: 예외 처리(웹소켓으로 전송할 필요 없이 ApplicationException 보내주기)
        List<ChatMessageResponse> chatList;
        try {
            // 채팅 읽기 업데이트
            updateReadWhenGetChatList(roomId, memberId);

            // 채팅 조회
            chatList= chatRepository.findChatList(roomId);
        } catch (Exception e) {
            System.out.println(e);
            throw new ApplicationException(ErrorCode.INTERNAL_SERVER_ERROR);
        }
        return chatList;
    }



    /**
     * 채팅방 목록 불러오기
     */
    public List<ChatRoomResponse> getRoomList(Long memberId) {
        return chatRoomMembershipRepository.findChatRoomList(memberId);
    }

    /**
     * 채팅방 생성 요청 시 채팅방/멤버십 생성
     */
    public ChatMessage createChatRoom(ChatRoomMessage roomMessage) throws ApplicationException,Exception {
        // Todo: 해당 채팅방이 존재하는지 DB에서 확인
        // Todo: 만약 채팅 서버에서 에러가 발생했을 때 어떻게 클라이언트에게 전달할지 고민해야한다
        if (chatRoomRepository.findChatRoom(roomMessage.getJobPostId(), roomMessage.getSenderId()).isEmpty()) {
            // 채팅방 생성
            ChatRoom room = new ChatRoom(roomMessage.getJobPostId());
            chatRoomRepository.save(room);

            // 채팅방 멤버십 생성
            // Todo: 예외 시 어떻게 처리할지 작성
            Member sender = memberRepository.findById(roomMessage.getSenderId()).orElseThrow(() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
            Member receiver = memberRepository.findById(roomMessage.getReceiverId()).orElseThrow(() -> new ApplicationException(ErrorCode.USER_NOT_FOUND));
            ChatRoomMembership senderRoomMembership = new ChatRoomMembership(sender,receiver, room);
            ChatRoomMembership receiverRoomMembership = new ChatRoomMembership(receiver,sender, room);
            chatRoomMembershipRepository.save(senderRoomMembership);
            chatRoomMembershipRepository.save(receiverRoomMembership);


            // 채팅 생성
            ChatMessage chatMessage = createEnterChatMessage(sender, room);


            return chatMessage;
        } else  {
            // Todo: 예외 처리
            System.out.println("채팅방 존재");
            throw new ApplicationException(ErrorCode.CHATROOM_EXISTS);
        }
    }


    /**
     * 채팅방 안보이도록 설정
     */
    public void unvisibleChatRoom(ChatRoomMessage roomMessage) throws ApplicationException {

        // 실제 유효한 채팅방이 있는지 검증
        ChatRoom chatRoom = chatRoomRepository.findChatRoomById(roomMessage.getChatRoomId()).orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_NOT_FOUND));

        // 해당 채팅방이 본인과 상대방이 속한 채팅방인지 검증 및 채팅방 멤버십 조회
        ChatRoomMembership memberShip = chatRoomMembershipRepository.findValidChatRoom(chatRoom.getId(), roomMessage.getSenderId(), roomMessage.getReceiverId())
                .orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_MEMBERSHIP_NOT_FOUND));

        // 채팅방 멤버십 수정
        memberShip.changeVisible(false);
    }

    /**
     * 채팅방 나가기 설정
     */
    public ChatMessageResponse quitChatRoom(ChatRoomMessage roomMessage) throws ApplicationException {
        // Todo: 만약 상대방도 나가기가 되어 있을 시 해당 채팅방은 delete해줘야한다
        // 실제 유효한 채팅방이 있는지 검증
        ChatRoom chatRoom = chatRoomRepository.findChatRoomById(roomMessage.getChatRoomId()).orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_NOT_FOUND));

        // 해당 채팅방이 본인과 상대방이 속한 채팅방인지 검증 및 채팅방 멤버십 조회
        ChatRoomMembership memberShip = chatRoomMembershipRepository.findValidChatRoom(chatRoom.getId(), roomMessage.getSenderId(), roomMessage.getReceiverId())
                .orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_MEMBERSHIP_NOT_FOUND));

        // 상대방이 나갔는지부터 체크
        ChatRoomMembership opponentShip = chatRoomMembershipRepository.findChatRoom(chatRoom.getId(), roomMessage.getReceiverId(), roomMessage.getSenderId())
                .orElseThrow(() -> new ApplicationException(ErrorCode.ROOM_MEMBERSHIP_NOT_FOUND));

        // 상대방 나갔을 시 나도 나감 처리해주고 해당 채팅방 delete 하기
        if (opponentShip.getLeftAt() != null) {
            memberShip.getChatRoom().deleteChatRoom();
            // 내 채팅방 멤버십 나감 처리
            memberShip.quitChat();
            return null;
        } else {
            // 내 채팅방 멤버십 나감 처리
            memberShip.quitChat();

            // 상대방이 현재 채팅방에 접속했는지 확인 후 접속시엔 읽음처리
            Chat chat;
            String connectedRoomId = redisService.getChatUserRoomId(opponentShip.getId());
            if (connectedRoomId!=null && connectedRoomId.equals(roomMessage.getChatRoomId().toString())) {
                chat = Chat.quitTalkMessage(memberShip.getMember(),memberShip.getChatRoom(), true);
            } else {
                chat = Chat.quitTalkMessage(memberShip.getMember(),memberShip.getChatRoom(),false);
            }

            chatRepository.save(chat);

            // 마지막 채팅 내역 수정
            memberShip.getChatRoom().modifyLastMessage(chat.getId());
            ChatMessageResponse chatMessageResponse = new ChatMessageResponse(chat);
            return chatMessageResponse;
        }
    }


    /**
     * 문의 시(채팅방 생성) 입장 채팅 생성
     */
    public ChatMessage createEnterChatMessage(Member member, ChatRoom room) {
        Chat enterMessage = Chat.createEnterMessage(member, room);
        // 채팅 저장
        chatRepository.save(enterMessage);

        // 채팅방 마지막 메시지 설정
        room.modifyLastMessage(enterMessage.getId());
        return new ChatMessage(room.getId(), member.getId(),enterMessage.getContent(), ChatMessage.Type.ENTER);
    }

    public void updateReadWhenGetChatList(Long roomId, Long memberId) {
        chatRepository.updateChatRead(roomId, memberId);
    }


    /**
     * 채팅방 접속 유무 수정 로직
     */
    public void modifyConnectionStatus(ConnectionStatusMessage message) {
        if (message.getType() == ConnectionStatusMessage.Type.CONNECT) {
            // 채팅방 접속 정보를 REDIS에 저장
            redisService.setChatUser(message.getMemberId(), message.getChatRoomId());

            // 상대에게 해당 멤버가 채팅방에 접속했다고 알려주기(재랜더링 목적)
            System.out.println("채팅방 접속 유무에 대한 접속 정보 알려주기");
            System.out.println(message.getOpponentId());
            messagingTemplate.convertAndSend("/sub/chat/renew/"+message.getOpponentId(),new ChatStatusResponse(ResponseCode.OK));

        } else if (message.getType() == ConnectionStatusMessage.Type.DISCONNECT) {
            redisService.removeChatUser(message.getMemberId());
        }
    }

    /**
     * 채팅 삭제
     */
    public void deleteChatMessage(Long memberId, Long opponentId, Long chatId) {
        Chat chat = chatRepository.findByChatId(chatId).orElseThrow(() -> new ApplicationException(ErrorCode.CHAT_NOT_FOUND));
        if (chat.getDeleteYn()) {
            throw new ApplicationException(ErrorCode.ALREADY_DELETED_CHAT);
        } else if (chat.getMember().getId()!=memberId) {
            throw new ApplicationException(ErrorCode.NOT_CHAT_OWNER);
        }
        chat.deleteChat();

        // 삭제 된 이후 클라이언트에게 알려 메시지 삭제자는 채팅방 목록을, 상대방은 채팅방 목록과 채팅 목록 모두 갱신시켜줘야한다
        // 메시지 삭제자 갱신
        messagingTemplate.convertAndSend("/sub/chat-room/renew/"+memberId,new ChatStatusResponse(ResponseCode.OK));


        // 상대방 갱신
        messagingTemplate.convertAndSend("/sub/chat-room/renew/"+opponentId,new ChatStatusResponse(ResponseCode.OK));
        messagingTemplate.convertAndSend("/sub/chat/renew/"+opponentId,new ChatStatusResponse(ResponseCode.OK));
    }

}

ChatRepository

public interface ChatRepository extends JpaRepository<Chat, Long> {

    @Query("select new com.chatServer.chat.dto.response.ChatMessageResponse(c.id,c.member.id,c.createdAt,c.content,c.readYn,c.deleteYn) " +
            "from Chat c where c.chatRoom.id = :roomId and c.chatRoom.deleteAt is null")
    List<ChatMessageResponse> findChatList(@Param("roomId") Long roomId);

    @Modifying
    @Query("delete from Chat c where c.chatRoom.id IN :rooms")
    void deleteChatByScheduling(@Param("rooms") List<Long> rooms);

    @Modifying
    @Query("update Chat c set c.readYn=true where c.chatRoom.id= :roomId and c.member.id <> :memberId")
    void updateChatRead(@Param("roomId") Long roomId, @Param("memberId") Long memberId);

    @Query("select c from Chat c join fetch c.member where c.id = :chatId")
    Optional<Chat> findByChatId(@Param("chatId") Long chatId);
}

ChatRoomRepository

public interface ChatRoomRepository extends JpaRepository<ChatRoom, Long> {

    @Query("select cr from ChatRoom cr join ChatRoomMembership crm on crm.member.id = :senderId where cr.jobPostId = :jobPostId and cr.deleteAt IS NULL")
    Optional<ChatRoom> findChatRoom(@Param("jobPostId") Long jobPostId, @Param("senderId") Long senderId);

    @Query("select cr from ChatRoom cr where cr.id = :roomId and cr.deleteAt is null")
    Optional<ChatRoom> findChatRoomById(@Param("roomId") Long roomId);

    @Query(value = "select cr.room_id from chatroom cr where cr.delete_at <= DATE_SUB(CONVERT_TZ(now(),'+00:00', '+09:00'), INTERVAL 6 MONTH)", nativeQuery = true)
    List<Long> findChatRoomToDelete();

    @Modifying
    @Query("delete from ChatRoom cr where cr.id IN :rooms")
    void deleteChatRoomsByScheduling(@Param("rooms") List<Long> rooms);
}

ChatRoomMembershipRepository

public interface ChatRoomMembershipRepository extends JpaRepository<ChatRoomMembership, Long> {

    @Query("select new com.chatServer.chat.dto.response.ChatRoomResponse(crms.chatRoom.id,opponent.member.id, opponent.nickName, opponent.profileImage, c.content,c.deleteYn, c.readYn,c.member.id, crms2.leftAt, crms.chatRoom.jobPostId) from ChatRoomMembership crms " +
            "join ChatRoomMembership crms2 on crms2.chatRoom.id=crms.chatRoom.id and crms2.opponent.id = :memberId " +
            "join MemberProfileInfo opponent " +
            "on crms.opponent.id = opponent.member.id and opponent.deletedYn = false " +
            "join Chat c on c.id=crms.chatRoom.lastChatId " +
            "where crms.member.id = :memberId and crms.isVisible = true and crms.leftAt is null and crms.chatRoom.deleteAt is null "+
            "order by c.createdAt desc"
    )
    List<ChatRoomResponse> findChatRoomList(@Param("memberId") Long memberId);


    @Query("select rm.opponent from ChatRoomMembership rm " +
            "where rm.chatRoom.id = :roomId " +
            "and rm.member.id = :memberId " +
            "and rm.leftAt is null " +
            "and rm.chatRoom.deleteAt is null")
    Optional<Member> findOpponentId(@Param("roomId") Long roomId, @Param("memberId") Long memberId);

    @Query("select rm from ChatRoomMembership rm " +
            "where rm.chatRoom.id = :roomId " +
            "and rm.member.id = :memberId " +
            "and rm.opponent.id = :opponentId " +
            "and rm.leftAt is null")
    Optional<ChatRoomMembership> findValidChatRoom(@Param("roomId") Long roomId, @Param("memberId") Long memberId, @Param("opponentId") Long opponentId);

    @Query("select rm from ChatRoomMembership rm " +
            "where rm.chatRoom.id = :roomId " +
            "and rm.member.id = :memberId " +
            "and rm.opponent.id = :opponentId ")
    Optional<ChatRoomMembership> findChatRoom(@Param("roomId") Long roomId, @Param("memberId") Long memberId, @Param("opponentId") Long opponentId);

    @Modifying
    @Query("delete from ChatRoomMembership rm where rm.chatRoom.id IN :rooms")
    void deleteMemberShipByScheduling(@Param("rooms") List<Long> rooms);
}

ChatEntity

@Entity
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Chat {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name="chat_id")
    private Long id;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(updatable = false,name="member_id")
    private Member member;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(updatable = false,name="room_id")
    private ChatRoom chatRoom;

    private String content;

    @Column(name="read_yn")
    private Boolean readYn;

    @Column(name="delete_yn")
    private Boolean deleteYn;

    @Column(updatable = false, name="created_at")
    private LocalDateTime createdAt;

    @PrePersist
    public void prePersist() {
        this.createdAt = LocalDateTime.now();
    }

    public static Chat createEnterMessage(Member member,ChatRoom room) {
        return Chat.builder()
                .member(member)
                .chatRoom(room)
                .content("1:1 문의하기 입장하셨습니다.")
                .readYn(false)
                .deleteYn(false)
                .build();
    }

    public static Chat createTalkMessage(Member member, ChatRoom room,String content,boolean readYn) {
        return Chat.builder()
                .member(member)
                .chatRoom(room)
                .content(content)
                .readYn(readYn)
                .deleteYn(false)
                .build();
    }

    public static Chat quitTalkMessage(Member member, ChatRoom room, boolean readYn) {
        return Chat.builder()
                .member(member)
                .chatRoom(room)
                .content("채팅방을 나가셨습니다.")
                .readYn(readYn)
                .deleteYn(false)
                .build();
    }

    public void deleteChat() {
        this.deleteYn = true;
    }

}

ChatRoomEntity

@Entity
@Getter
@NoArgsConstructor
@Table(name="chatroom")
public class ChatRoom {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name="room_id")
    private Long id;

    @Column(name="jobpost_id")
    private Long jobPostId;

    @Column(name="created_at")
    private LocalDateTime createdAt;

    @Column(name="last_chat_id")
    private Long lastChatId;

    @Column(name="delete_at")
    private LocalDateTime deleteAt;

    public ChatRoom(Long jobPostId) {
        this.jobPostId = jobPostId;
    }

    @PrePersist
    public void prePersist() {
        this.createdAt = LocalDateTime.now();
        this.deleteAt = null;
    }

    public void modifyLastMessage(Long lastChatId) {
        this.lastChatId = lastChatId;
    }

    public void deleteChatRoom() {
        this.deleteAt = LocalDateTime.now();
    }

}

ChatRoomMembershipEntity

@Entity
@NoArgsConstructor
@Table(name="room_membership")
@Getter
public class ChatRoomMembership {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name="membership_id")
    private Long id;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(updatable = false,name="member_id")
    private Member member;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(updatable = false,name="opponent_id")
    private Member opponent;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(updatable = false,name="room_id")
    private ChatRoom chatRoom;

    @Column(name="visible")
    private Boolean isVisible;

    @Column(name="left_at")
    private LocalDateTime leftAt;

    public ChatRoomMembership(Member member, Member opponent, ChatRoom chatRoom) {
        this.member = member;
        this.opponent = opponent;
        this.chatRoom = chatRoom;
        this.isVisible = true;
    }

    public void changeVisible(Boolean isVisible) {
        this.isVisible = isVisible;
    }

    public void quitChat() {
        this.leftAt = LocalDateTime.now();
    }
}

그 외에 DTO들은 생략하도록 하겠습니다.

😁 채팅방 생성

채팅방 생성은 채팅 서버가 아닌 메인 서버에서 요청을 받아 Redis PUB/SUB System을 통해 채팅 서버로 전달하게 됩니다. 이전 포스팅에서 설명드렸던 내용이니 코드는 생략하도록 하겠습니다.

채팅 서버에서는 ChatRoomSubscriber를 통해 Redis PUB/SUB System에서 chatRoom으로 발행되는 메시지를 수신할 수 있습니다.

ChatRoomSubscriber에서는 발행된 메시지를 수신하여 메시지의 타입을 검사하게 됩니다. CREATE 타입일 경우 ChatServicecreateChatRoom 메서드가 호출되어 관련 유효성 검사를 진행 후 채팅방채팅방 멤버십 엔티티를 생성하여 DB에 저장하고 createEnterChatMessage 메서드를 호출하여 채팅 엔티티를 생성하여 DB에 저장하고 채팅방의 마지막 메시지를 생성된 채팅으로 수정해주는 작업을 하게 됩니다.

이후 웹소켓을 통해 /sub/chat-room/renew/{user-id} 주소로 생성된 채팅을 보내게 됩니다. 채팅은 일대일 채팅이므로 발신자와 수신자 두명에게 메시지를 전달하게 됩니다.

클라이언트에서는 해당 메시지를 전달받아 재랜더링을 해주어 화면을 재구성하게 됩니다.

😁 채팅방 안보이게 설정

채팅방 안보이게 설정하는 요청 또한 메인 서버에서 요청을 받아 Redis PUB/SUB System을 통해 채팅 서버로 전달하게 됩니다.

채팅 서버에서 ChatRoomSubscriber를 통해 Redis PUB/SUB System에서 chatRoom으로 발행되는 메시지를 수신할 수 있습니다.

ChatRoomSubscriber에서는 발행된 메시지를 수신하여 메시지의 타입을 검사하게 되는데 UNIVISIBLE 타입일 경우 ChatServiceunvisibleChatRoom 메서드를 호출하게 됩니다. 해당 메서드에서는 실제 유효한 채팅방이 있는지에 대한 검증하고 해당 채팅방이 본인과 상대방이 속한 채팅방인지 검증 및 채팅방 멤버십을 조회하고 해당 채팅방 멤버십 상태를 unvisible로 수정합니다.

이후 웹소켓을 통해 /sub/chat-room/renew/{user-id} 주소(채팅방 안보이게 설정을 요청한 사용자에게만)로 완료 처리됐다는 메시지를 전송하게 됩니다.

클라이언트에서는 해당 메시지를 전달받아 재랜더링을 해주어 화면을 재구성하게 됩니다.

😁 채팅방 나가기

채팅방 안보이게 설정하는 요청 또한 메인 서버에서 요청을 받아 Redis PUB/SUB System을 통해 채팅 서버로 전달하게 됩니다.

ChatRoomSubscriber에서는 발행된 메시지를 수신하여 메시지의 타입 검사를 하고 QUIT 타입일 경우 ChatServicequitChatRoom 메서드를 호출하여 채팅방 나가기 로직을 수행합니다.

quitChatRoom 메서드에서는 실제 유효한 채팅방이 있는지, 해당 채팅방 본인과 상대방이 속한 채팅방인지 검증을 하고 채팅방 멤버십을 조회하여 상대방이 나간 상태인지에 대해 체크하게 됩니다.

상대방이 나갔을 시 본인도 나감 처리를 해주고 해당 채팅방을 삭제해주게 됩니다. 이후 본인의 채팅방 멤버십을 나감 처리하고 null을 return하게 됩니다.

만약 상대방이 나가지 않았을 시에는 상대방이 현재 채팅방에 접속해있는지를 확인한 후 읽음 처리를 하게 됩니다. 마지막으로 채팅 내역을 수정하고 해당 채팅을 return 하게 됩니다.

만약 quitChatRoom 메서드가 반환한 값이 null이 아닐 시 상대방이 나가지 않은 상태이기 때문에 상대방에게 내가 채팅방을 나갔다는 채팅을 전송하기 위해 /sub/chat/room/"+chatRoomMessage.getChatRoomId() 주소로 메시지를 전송합니다.

그리고 채팅방 목록을 갱신해주기 위해 두 사용자에게 /sub/chat-room/renew/{user-id} 주소로 채팅방 나가기 완료 메시지를 전송하게 됩니다.

클라이언트에서는 해당 메시지를 전달받아 재랜더링을 해주어 화면을 재구성하게 됩니다.

😁 실시간 채팅 전송(상대방 읽음 여부 처리 포함)

실시간 채팅은 클라이언트가 ws.send("/pub/chat/message",{"Authorization":getCookie("Authorization")},JSON.stringify(message)) 코드를 통해서 전송하게 됩니다. ChatControllermessage 메서드는 /pub/chat/message 주소로 오는 웹소켓 메시지를 받아서 처리하게 됩니다.

이후 ChatServicesendChatMessage를 호출하게
되고 내부에서 사용자와 채팅방에 대한 유효성 검사를 진행한 후 상대방이 채팅방을 체크하고 나갔을 시 ErrorCode.OPPONENT_LEFT_OUT 에러코드를 발생시켜 에러 메시지를 클라이언트에게 전달하게 됩니다.

상대방이 채팅방을 나가지 않았을 시 정상적으로 채팅을 생성하게 되는데, Redis에서 채팅 접속 유무에 대한 정보를 확인한 후 상대방이 채팅방에 접속중일 시 생성하는 채팅 엔티티readYn 상태를 true로, 접속중이 아닐 시 false로 지정하여 저장해줍니다.

그리고 채팅방의 마지막 메시지를 수정해주고, 만약 상대가 채팅방 안보이게 설정한 상태일 시 보이도록 하기 위해 채팅방 멤버십에서 상대 유저의 unvisible 상태를 visible로 수정해줍니다.

그리고 Redis PUB/SUB System을 통해 생성한 채팅 정보를 발행하게 됩니다.

Redis PUB/SUB System에 발행된 채팅 정보 메시지는 다른 채팅 서버들이 ChatMessageSubscribersendMessage에서 수신하여 처리하게 됩니다.

해당 채팅방을 구독한 사용자에게 채팅 메시지를 전달하고 채팅 상대 및 본인의 채팅방 목록을 갱신하기 위해 웹소켓 통신을 보냄으로써 마무리가 됩니다.


😁 참고

Redis에서 다음 형태로 chatUser를 관리합니다.

해당 정보들은 클라이언트가 채팅방을 클릭할 시 ws.send("/pub/chat/modify-connection-status",{"Authorization":getCookie("Authorization")},JSON.stringify(message)) 코드가 호출되어 ChatControllermodifyConnectionStatus 메서드가 웹소켓 메시지를 처리하게 됩니다.

ChatServicemodifyConnectionStatus 메서드는 아래와 같습니다.

/**
     * 채팅방 접속 유무 수정 로직
     */
    public void modifyConnectionStatus(ConnectionStatusMessage message) {
        if (message.getType() == ConnectionStatusMessage.Type.CONNECT) {
            // 채팅방 접속 정보를 REDIS에 저장
            redisService.setChatUser(message.getMemberId(), message.getChatRoomId());

            // 상대에게 해당 멤버가 채팅방에 접속했다고 알려주기(재랜더링 목적)
            System.out.println("채팅방 접속 유무에 대한 접속 정보 알려주기");
            System.out.println(message.getOpponentId());
            messagingTemplate.convertAndSend("/sub/chat/renew/"+message.getOpponentId(),new ChatStatusResponse(ResponseCode.OK));

        } else if (message.getType() == ConnectionStatusMessage.Type.DISCONNECT) {
            redisService.removeChatUser(message.getMemberId());
        }
    }

modifyConnectionStatus 메서드에서 CONNECT, DISCONNECT의 타입에 따라 채팅방 접속 정보를 갱신하게 됩니다.


긴 내용 포스팅을 하였는데 읽으시느라 고생 많으셨습니다. 다음에는 더 유익한 내용으로 뵙겠습니다.

profile
TO BE DEVELOPER

1개의 댓글

comment-user-thumbnail
2025년 1월 13일

좋은 글 감사합니다:)

답글 달기

관련 채용 정보