채팅 기능은 기본적으로
A 클라이언트 가 메시지를 보내면
서버가 메시지를 받고
B 클라이언트로 A 클라이언트로 부터 전달 받은 메시지를 전송하는 과정이라고 할 수 있다.
서버가 클라이언트로부터 메시지를 받는 것은 어렵지 않다. 지금까지 HTTP
를 이용하여 통신하였던 것이 서버가 클라이언트로부터 메시지를 받는 것이었기 때문이다.
문제는 서버에서 클라이언트로 어떻게 메시지를 보낼 수 있느냐 이다.
A 클라이언트로 부터 받은 메시지를 서버가 과연 어떻게 B에게 메시지를 전달할 수 있을까?
HTTP
를 이용하는 방법으로 가장 직관적으로 생각나는 구조이다.
채팅방으로 예를 들면
클라이언트 B가 내가 참여한 채팅방에 새로운 메시지가 있는지 주기적으로 서버에게 물어보는 방식이다.
이 방식은 직관적이지만
몇 가지 단점이 있다.
1) 대부분의 경우 Response
에 아무것도 없을 가능성이 높으나 지속적으로 클라이언트에서 Request
를 보내면서 서버의 비용을 올린다.
(스프링 서버에서 http 1.1 을 사용할 때 keep - alive 가 기본적으로 활성화 되어있기 때문에 모든 요청마다 TCP 연결을 수립한다고 말하기 부담스럽다. 이 점에 대해서는 알아봐야 한다)
2) 메시지 Latency: Request 의 간격이 1초라고 한다면 클라이언트는 최대 1초가 지난 시점에야 메시지가 있음을 알 수 있다.
클라이언트가 Request
를 보내도 서버가 응답에 대한 사용 가능한 값이 없다면 대기한다. 그러다가 대기하는 기간동안 데이터가 준비되지 못한다면 timeout 응답을 보내고 만약 대기하는 기간 동안 값이 준비되면 즉시 데이터를 클라이언트에게 전달한다.
이 역시 Polling 방식과 유사한 단점을 가진다.
1) 불필요한 Request 수 (Polling 보다는 적다, polling 보다는 서버의 부담이 적다)
2) 지속적으로 연결되어 있기 때문에 클라이언트가 많아지면 서버 부담이 늘어난다.
클라이언트와 서버가 양방향 통신이 아닌
서버에서 클라이언트로 단방향 통신으로
채팅 기능에는 부적합하다.
1)HTTP
를 통해 메시지를 보내고 그 메시지에 대한 응답으로 이벤트 처리를 하기에 서버 클라이언트 간 통신이 어려워 짐
기존의 단방향 프로토콜과 호환되어 양방향 통신을 제공하기 위한 프로토콜이다.
연결 수립시 클라이언트와 서버 모두 자유롭게 데이터를 보낼 수 있음
HTTP 를 이용하여 연결을 수립하여 연결이 된 이후에도 연결을 할 때 사용했던 80 포트와 443 포트를 사용한다. 따라서 추가로 방화벽을 열지 않아도 된다.(포트 관련 처리를 하지 않아도 된다)
한 번의 HTTP 요청과 한번의 HTTP 응답으로 이루어 짐
HandShake 가 끝나면 HTTP 프로토콜을 webSocket 프로토콜로 변환하여 통신
서버와 클라이언트의 핸드셰이크 요청과 응답은 Http 1.1 을 사용한다.
HTTP 의 단점 중 하나는 HTTP 는 매번 연결을 새로 열기 때문에 비용이 크다.
하지만 WebSocket Connection 을 맺어 두면 한번의 연결로 여러 데이터를 실시간을 전달할 수 있다.
따라서WebSocket
을 이용하여 간단한 채팅 기능을 구현하여 본다.
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketChatHandler webSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/ws/chat").setAllowedOrigins("*");
}
}
TextWebSocketHandler
클래스를 상속받아 WebSocket 세션의 연결 혹은 연결 이후 처리 등을 수행할 수 있다.
사용자가 입력한 채팅을 가지고 가공하여 처리하는 메서드인
handlerTextMessage
도 제공하여 준다.
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketChatHandler extends TextWebSocketHandler {
private final ChatService chatService;
private final ObjectMapper objectMapper;
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
chatService.sendMessageWithHandlerRoom(session,objectMapper.readValue(message.getPayload(),
ChatWebSocketDto.class));
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// TODO Auto-generated method stub
log.info("{} 연결됨", session.getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// TODO Auto-generated method stub
log.info("{} 연결 끊김", session.getId());
chatService.removeSession(session);
}
}
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
@Slf4j
public class ChatService {
private final ChattingRoomRepository chattingRoomRepository;
private final MessageRepository messageRepository;
private final MemberRepository memberRepository;
private final Map<Long, Set<WebSocketSession>> chatRoomSessionMap = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
@Transactional
public void sendMessageWithHandlerRoom(WebSocketSession webSocketSession, ChatWebSocketDto chatWebSocketDto){
ChattingRoom chattingRoom = getChattingRoomByParticipant(chatWebSocketDto);
Member sender = memberRepository.findById(chatWebSocketDto.getSenderId()).orElseThrow(()->new NotFoundException("없음"));
if(!chatRoomSessionMap.containsKey(chattingRoom.getId())){
chatRoomSessionMap.put(chattingRoom.getId(),new HashSet<>());
}
Set<WebSocketSession> webSocketSessions = chatRoomSessionMap.get(chattingRoom.getId());
if(chatWebSocketDto.getMessageType().equals(MessageType.ENTER)){
webSocketSession.getAttributes().put("roomId",chattingRoom.getId());
webSocketSessions.add(webSocketSession);
chatRoomSessionMap.put(chattingRoom.getId(),webSocketSessions);
chatWebSocketDto.changeMessageToEnter(sender.getName());
}
sendMessage(chatWebSocketDto,webSocketSessions,chattingRoom,sender);
}
public void removeSession(WebSocketSession webSocketSession){
chatRoomSessionMap.get(webSocketSession.getAttributes().get("roomId")).remove(webSocketSession);
}
private Member findMemberById(Long memberId){
return memberRepository.findById(memberId).orElseThrow(()->new NotFoundException("유저 없음"));
}
//DB에서 chatting room 을 찾는다. 만약에 없다면 새로운 채팅방을 생성한다.
private ChattingRoom getChattingRoomByParticipant(ChatWebSocketDto chatWebSocketDto){
return chattingRoomRepository.findByParticipant(chatWebSocketDto.getReceiverId(),chatWebSocketDto.getSenderId()).orElseGet(
()-> {
ChattingRoom newChattingRoom = ChattingRoom.builder()
.sender(findMemberById(chatWebSocketDto.getSenderId()))
.receiver(findMemberById(chatWebSocketDto.getReceiverId()))
.build();
chattingRoomRepository.save(newChattingRoom);
return newChattingRoom;
}
);
}
public <T> void sendMessage(ChatWebSocketDto message,Set<WebSocketSession> webSocketSessions,ChattingRoom chattingRoom,Member sender) {
webSocketSessions.parallelStream().forEach(session -> {
try {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(message)));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
messageRepository.save(Message.builder()
.sender(sender)
.chattingRoom(chattingRoom)
.contents(message.getMessage())
.build());
}
}
각 부분에 대한 설명
private final Map<Long, Set<WebSocketSession>>
chatRoomSessionMap = new ConcurrentHashMap<>();
사용자의 연결된 session을 따로 담아서 관리할 필요가 있다.
이렇게 세션을 메모리에 담아 둠으로써 메시지를 보내거나 혹은 특정 사용자가 특정 방에 입장하였는지 등 다양한 분야로 응용이 가능하다.
지금은 메시지를 보내기 위해서 session 을 따로 담아서 관리한다.
Map 에 Session 을 담아서 관리를 하는데
map 의 key 에는 채팅방의 아이디 값을 key 값으로 두고
Value 에는 그 채팅방에 속한 Session을 Set 자료구조로 관리한다.
Map 에서 ConcurrentHashMap 을 구현체로 사용한 이유는 동시성 이슈를 방지하기 위해서이다.
if(chatWebSocketDto.getMessageType().equals(MessageType.ENTER)){
webSocketSession.getAttributes().put("roomId",chattingRoom.getId());
webSocketSessions.add(webSocketSession);
chatRoomSessionMap.put(chattingRoom.getId(),webSocketSessions);
chatWebSocketDto.changeMessageToEnter(sender.getName());
}
메시지의 타입이 ENTER 인 경우
session의 속성에 roomId 라는 key 에 대한 값으로 채팅방의 아이디값을 저장한다.
이를 통해 채팅방에 동일 채팅방에 속한 Session
들에 메시지를 보낼 수 있다.
public <T> void sendMessage(ChatWebSocketDto message,Set<WebSocketSession> webSocketSessions,ChattingRoom chattingRoom,Member sender) {
webSocketSessions.parallelStream().forEach(session -> {
try {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(message)));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
messageRepository.save(Message.builder()
.sender(sender)
.chattingRoom(chattingRoom)
.contents(message.getMessage())
.build());
}
메시지를 동일 채팅방에 속한 Session
들에게 직접 전송하는 코드이다.
속도 단축 및 병렬 처리를 위해 parallelStream 을 이용할 수 있다.
사용자들의 Session 관리를 직접 수행해야 메시지를 전송할 수 있으므로 복잡한 측면이 존재한다.
메시지를 보내는 등 어떤 동작을 수행할 때
WebSocket 만을 이용하면 따로 헤더나 메시지 본문 등 형식이 지정되어 있지 않다.
Stomp 프로토콜은
TransportLayer 에서 동작하는 WebSocket 과 달리
Application Layer 에서 동작하며
데이터 형식을 지정 하고 편리한 /pub, /sub 구조를 통해서 메시지를 전달 할 수 있다.
"SEND\n" +
"destination:/pub/message/${__threadNum}\n" +
"content-length:20\n" + // 메시지 본문의 정확한 바이트 길이
"\n" + // 헤더와 본문 사이의 빈 줄
"{\"message\":\"hihihp\"}\u0000"; // 실제 메시지 본문
위와 같이 메시지 형식이 지정되어 있다.
이렇게 형식이 지정되어 있기 때문에
header
에 acccessToken
을 저장하고
인증하는 방식을 통해서
스프링 시큐리티와 연계할 수도 있다.
최근 프로젝트 코드를 가져와 보면
messagingService.sendMessage("/sub/chat/room/"+roomId,
MessageResponseDto.ofPayLoad
(sender.getId(),chatDto,messageCreatedTime,readCount,roomId));
@Service
@RequiredArgsConstructor
public class MessagingService {
private final SimpMessageSendingOperations simpMessageSendingOperations;
public void sendMessage(final String destination, final MessageResponseDto messageResponseDto){
simpMessageSendingOperations.convertAndSend(destination,messageResponseDto);
}
}
/pub,/sub 구조를 통해서 Session 을 직접 관리하지 않아도 해당 주제를 구독한 client 세션으로 채팅 메시지를 보낼 수 있다.
웹소켓 만으로 구현한 것에 비해 무거울 수 있다.
블로그 좋아요!