이번에 진행하는 프로젝트에서 채팅을 구현할 일이 생겨서 채팅 연습용 프로젝트를 하나 만들어가면서 실시간으로 메세지를 주고받는 서버를 만들어보고 Postman을 통하여 테스트 해보겠습니다.
!!! 데이터베이스에 데이터 읽기/쓰기는 복잡해서 구현하지 않습니다. !!!
Redis의 Pub/Sub 기능을 제외하고는 데이터베이스의 기능을 사용하지 않습니다.
Application 안에서의 로직을 통해 데이터를 주고받는데에 초점을 뒀습니다.
진행 플로우
1. WebSocket 연결로 실시간 메세지 통신
2. 단일 서버에서의 채팅 구현 >>단일서버 레포 클릭<<
3. 분산 서버에서의 채팅 구현 >>분산서버 레포 클릭<<
개발환경
Spring 프로젝트 생성 URL: https://start.spring.io/
요구사항
- Stomp 사용 X
- 유저 인증
- 분산서버를 고려
- 구독을 동적으로 여러채널을 구독가능
들어가기에 앞서 왜 저런 요구사항이 나왔는지
Stomp 테스트툴을 찾기가 어려웠고 APIC 라는 툴을 통해 Stopm를 테스트 해볼수 있다고 했지만 현재 다운로드가 막혀서 테스트가 불가능하였습니다.
기존 프로젝트가 스케일 아웃의 형태로 분산되어있는 형태의 서버에서도 통신이 가능하게끔 설계하였습니다.
채팅리스트 화면에서 여러 채널을 한번에 구독하고 마지막 메세지를 기반으로 채팅방이 정렬되게끔 설계를 하였기때문에 다중구독 및 동적으로 구독을 하는 방향으로 설계하였습니다.
간단하게 일반적으로 HTTP프로콜의 메세지 전달방식은 Stateless의 형태이기 때문에 메세지를 주고받으면서 커넥션이 끊어지게 되는데(실제로는 한번의 메세지에 끊어지진 않지만 간단하게 설명하겠습니다.) WS(WebSocket)은 HTTP 통신 후 WS으로 업데이트를 하여서 커넥션이 끊어지지 않고 패킷을 계속해서 주고 받을수있는 Stateful형태의 프로토콜 입니다.
Remote DIctionary Server의 약자로 Key:Value 형태의 데이터 저장소이며 데이터가 메모리에 올려져 저장되어서 디스크에 저장되는 DB보다 IO작업이 빠르며 흔히 캐시DB로 자주 사용되게 됩니다.
Redis에서 제공하는 Pub/Sub 기능을 이용하여 분산서버에서의 채팅이 가능하게끔 할것인데 그래서 Pub/Sub이 뭐냐
Pub/Sub
Redis의 Pub/Sub(게시/구독)은 메시지 브로커 패턴 통해, 메시지를 게시하는 클라이언트와 해당 메시지를 수신하는 클라이언트 간의 통신을 지원하는데 이 기능을 사용하여 메시지를 비동기적으로 통신할수 있습니다.
흔히 신문구독을 떠올리면 편합니다.
신문사(Pub) -> 신문배달HUB(Broker) -> 신문구독자(Sub)
Handler 작성
@Component
@Log4j2
@RequiredArgsConstructor
public class WebSocketChatHandler extends TextWebSocketHandler {
/**
* 웹소켓 연결 성공시
* @param session
*/
@Override
public void afterConnectionEstablished(WebSocketSession session){
try {
session.sendMessage(
new TextMessage("웹소켓 연결 성공"));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
WebSocketConfig 웹소켓 설정
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketChatHandler webSocketChatHandler;
/**
* 웹소켓 연결을 위한 설정
* 웹소켓 연결 EndPoint: ws://localhost:8080/chats
* 에 연결시 동작할 핸들러는 webSocketChatHandler
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketChatHandler, "/chats")
.setAllowedOrigins("*");
}
}
이제 포스트맨을 통해 테스트 해보겠습니다. WebSocket Requst를 하나 생성합니다.
설정해둔 ws://localhost:8080/chats 으로 연결을 하고나면 웹소켓 연결 성공이라는 메세지를 받아본걸 확인이 가능합니다.
유저 인증 HandShakeInterceptor
@Component
@RequiredArgsConstructor
public class WebSocketAuthInterceptor implements HandshakeInterceptor {
/**
* 웹소켓 연결 전 인터셉터
* Authorization 헤더를 확인하여 유저를 인증한다.
* 유저 명이 채팅유저로 시작하지 않으면 401을 반환한다.
* 유저 명이 채팅유저로 시작해서 인증된 유저라면 session에 username을 저장한다.
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
HttpServletResponse servletResponse = ((ServletServerHttpResponse) response).getServletResponse();
String username = servletRequest.getHeader("Authorization");
if (username != null && username.startsWith("chatUser")) {
attributes.put("username", username);
return true;
} else {
servletResponse.setStatus(401);
return false;
}
}
return false;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
}
저는 chatUser로 시작하지 않으면 권한없음으로 반려하였는데
프로젝트 요구사항에 따른 인증 로직을 넣으시면 될것 같습니다
예) JWT 토큰기반, 세션기반 등등
WebSocketAuthInterceptor을 WebSocketConfig에 주입
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketChatHandler, "/chats")
.addInterceptors(webSocketAuthInterceptor)
.setAllowedOrigins("*");
}
Postman 테스트
헤더에 Authorization이 Null이거나 chatUser로 시작 안할시 401 StatusCode를 반환받는것을 확인
이로써 웹소켓 연결과 유저인증 Interceptor를 작성하였고 곧바로 단일 채팅으로 넘어가보도록 하겠습니다.
메세지 프로토콜
/**
* 웹소켓 메시지 타입
*/
public enum WebSocketMessageType {
ENTER("ENTER"),
JOIN("JOIN"),
TALK("TALK"),
EXIT("EXIT"),
SUB("SUBSCRIBE"),
PUB("PUBLISH");
private final String type;
WebSocketMessageType(String type) {
this.type = type;
}
public String getTypeValue() {
return this.type;
}
}
/**
* 웹소켓 메시지 프로토콜
*/
@RequiredArgsConstructor
public class WebSocketMessage <T> {
private final WebSocketMessageType type;
private final T payload;
public static <T> WebSocketMessage<T> of(WebSocketMessageType type, T payload) {
return new WebSocketMessage<>(type, payload);
}
}
ChatDto
@Getter
@RequiredArgsConstructor
@AllArgsConstructor
public class ChatDto {
private final Long chatRoomId;
private final String username;
private String message;
public void setMessage(String message){
this.message = message;
}
}
ChatRoom(socket.io의 Room 개념)
@Log4j2
@Getter
@RequiredArgsConstructor
public class ChatRoom {
private final Map<String, WebSocketSession> ActiveUserMap = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
/**
* 채팅방 입장
* @param chatDto ChatDto
* @param session 웹소켓 세션
*/
public void enter(ChatDto chatDto, WebSocketSession session) {
String username = (String) session.getAttributes().get("username");
ActiveUserMap.put(username, session);
for(Map.Entry<String, WebSocketSession> entry : ActiveUserMap.entrySet()) {
try {
if (!entry.getKey().equals(username))
entry.getValue().sendMessage(getTextMessage(WebSocketMessageType.ENTER, chatDto));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
/**
* 채팅방 퇴장
* @param chatDto ChatDto
*/
public void exit(String username, ChatDto chatDto) {
ActiveUserMap.remove(chatDto.getUsername());
for(Map.Entry<String, WebSocketSession> entry : ActiveUserMap.entrySet()) {
try {
if (!entry.getKey().equals(username))
entry.getValue().sendMessage(getTextMessage(WebSocketMessageType.EXIT, chatDto));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
/**
* 메시지 전송
* @param chatDto ChatDto
*/
public void sendMessage(String username, ChatDto chatDto) {
for(Map.Entry<String, WebSocketSession> entry : ActiveUserMap.entrySet()) {
try {
if (!entry.getKey().equals(username))
entry.getValue().sendMessage(getTextMessage(WebSocketMessageType.TALK, chatDto));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
/**
* 메시지 전송
* @param type 메시지 타입
* @param chatDto ChatDto
* @return TextMessage
*/
private TextMessage getTextMessage(WebSocketMessageType type, ChatDto chatDto) {
try {
return new TextMessage(
objectMapper.writeValueAsString(
new WebSocketMessage(type, chatDto)
));
}catch (JsonProcessingException e) {
log.error(e.getMessage());
throw new RuntimeException(e);
}
}
}
바뀐 WebSocketHandler
@Component
@Log4j2
@RequiredArgsConstructor
public class WebSocketChatHandler extends TextWebSocketHandler {
private final Map<Long, ChatRoom> chatRoomMap = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws JsonProcessingException {
String username = (String) session.getAttributes().get("username");
WebSocketMessage webSocketMessage = (WebSocketMessage) objectMapper.readValue(message.getPayload(), WebSocketMessage.class);
switch (webSocketMessage.getType().getValue()) {
case "ENTER" -> enterChatRoom(webSocketMessage.getPayload(), session);
case "TALK" -> sendMessage(username, webSocketMessage.getPayload());
case "EXIT" -> exitChatRoom(username, webSocketMessage.getPayload());
}
}
/**
* 메시지 전송
* @param chatDto ChatDto
*/
private void sendMessage(String username, ChatDto chatDto) {
log.info("send chatDto : " + chatDto.toString());
ChatRoom chatRoom = chatRoomMap.get(chatDto.getChatRoomId());
chatRoom.sendMessage(username, chatDto);
}
/**
* 채팅방 입장
* @param chatDto ChatDto
* @param session 웹소켓 세션
*/
private void enterChatRoom(ChatDto chatDto, WebSocketSession session) {
log.info("enter chatDto : " + chatDto.toString());
chatDto.setMessage(chatDto.getUsername() + "님이 입장하셨습니다.");
ChatRoom chatRoom = chatRoomMap.getOrDefault(chatDto.getChatRoomId(), new ChatRoom(objectMapper));
chatRoom.enter(chatDto, session);
chatRoomMap.put(chatDto.getChatRoomId(), chatRoom);
}
/**
* 채팅방 퇴장
* @param chatDto ChatDto
*/
private void exitChatRoom(String username, ChatDto chatDto) {
log.info("exit chatDto : " + chatDto.toString());
chatDto.setMessage(chatDto.getUsername() + "님이 퇴장하셨습니다.");
ChatRoom chatRoom = chatRoomMap.get(chatDto.getChatRoomId());
chatRoom.exit(username, chatDto);
if(chatRoom.getActiveUserMap().isEmpty()){
chatRoomMap.remove(chatDto.getChatRoomId());
}
}
}
(... gif 파일인데 조금 느립니다. 직접 타자를 치느라)
결과적으로 위 코드들을 조금 요약하자면
- ENTER 타입의 경우 특정 채팅방에 입장하게 되고 해당 유저의 Session을 저장하게 됩니다.
2.TALK 타입의 경우 해당 방에 참여중인 유저들의 Session을 가져와 반복문을 돌며 참여중인 유저들에게 채팅을 보냅니다.
3.EXIT 타입의 경우 커넥션을 끊고 참여중인 방의 Session을 지움으로써 더이상 메세지를 받지 않게됩니다.
Redis Pub/Sub을 이용한 채팅서버는 조금더 구현이 간단해지는데요
RedisConfig, RedisService, RedisMessageHandler가 추가되고 ChatRoom과 WebSocketHandler의 로직이 조금 변경되었습니다.
RedisConfig
@Configuration
public class RedisConfig {
/**
* 메시지를 수신하고 처리할 수 있게 해주는 컨테이너
* @param connectionFactory RedisConnectionFactory
* @return RedisMessageListenerContainer
*/
@Bean
public RedisMessageListenerContainer redisMessageListener(
RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
/**
* Redis에 대한 기본적인 연결과 통신을 담당하는 클래스
* @param redisConnectionFactory RedisConnectionFactory
* @return StringRedisTemplate
*/
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
return new StringRedisTemplate(redisConnectionFactory);
}
}
RedisServiceImpl
@Service
@RequiredArgsConstructor
public class RedisServiceImpl {
private final StringRedisTemplate stringRedisTemplate;
/**
* 메시지 발행
* @param channel 채널
* @param message 메시지
*/
public void publish(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
}
/**
* 메시지 구독
* @param channel 채널
* @param session WebSocketSession
*/
public void subscribe(String channel, WebSocketSession session) {
Objects.requireNonNull(stringRedisTemplate.getConnectionFactory())
.getConnection()
.subscribe(getMessageHandler(session), channel.getBytes());
}
/**
* 여러 채널 메시지 구독
* @param channel 채널
* @param session WebSocketSession
*/
public void subscribe(String[] channel, WebSocketSession session) {
for (String c : channel){
Objects.requireNonNull(stringRedisTemplate.getConnectionFactory())
.getConnection()
.subscribe(getMessageHandler(session), c.getBytes());
}
}
/**
* 메세지 핸들러 생성
* @param session WebSocketSession
*/
private RedisMessageHandler getMessageHandler(WebSocketSession session) {
return new RedisMessageHandler(session);
}
}
RedisMessageHandler
@Log4j2
@RequiredArgsConstructor
class RedisMessageHandler implements MessageListener {
private final WebSocketSession session;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Redis 메시지 수신
* @param message 메시지
* @param pattern 패턴
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
WebSocketMessage webSocketMessage = objectMapper.readValue(message.getBody(), WebSocketMessage.class);
if(session.isOpen() && !webSocketMessage.getPayload().getUsername().equals(session.getAttributes().get("username"))){
session.sendMessage(new TextMessage(new String(message.getBody())));
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
변경된 ChatRoom
@Log4j2
@Getter
@Component
@RequiredArgsConstructor
public class ChatRoom {
private final ObjectMapper objectMapper = new ObjectMapper();
private final RedisServiceImpl redisService;
/**
* 채팅방 입장
* @param chatDto ChatDto
* @param session 웹소켓 세션
*/
public void enter(ChatDto chatDto, WebSocketSession session) {
String username = (String) session.getAttributes().get("username");
String channel = "chatRoom:"+chatDto.getChatRoomId();
redisService.subscribe(channel, session);
chatDto.setMessage(username + "님이 입장하셨습니다.");
redisService.publish(channel, getTextMessage(WebSocketMessageType.ENTER, chatDto));
}
/**
* 메시지 전송
* @param chatDto ChatDto
*/
public void sendMessage(ChatDto chatDto) {
String channel = "chatRoom:"+chatDto.getChatRoomId();
redisService.publish(channel, getTextMessage(WebSocketMessageType.TALK, chatDto));
}
/**
* 메시지 전송
* @param type 메시지 타입
* @param chatDto ChatDto
* @return String
*/
private String getTextMessage(WebSocketMessageType type, ChatDto chatDto) {
try {
return objectMapper.writeValueAsString(new WebSocketMessage(type, chatDto));
}catch (JsonProcessingException e) {
log.error(e.getMessage());
throw new RuntimeException(e);
}
}
}
변경된 WebSocketHandler
@Component
@Log4j2
@RequiredArgsConstructor
public class WebSocketChatHandler extends TextWebSocketHandler {
private final ChatRoom chatRoom;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws JsonProcessingException {
String username = (String) session.getAttributes().get("username");
WebSocketMessage webSocketMessage = (WebSocketMessage) objectMapper.readValue(message.getPayload(), WebSocketMessage.class);
switch (webSocketMessage.getType().getValue()) {
case "ENTER" -> enterChatRoom(webSocketMessage.getPayload(), session);
case "TALK" -> sendMessage(username, webSocketMessage.getPayload());
}
}
/**
* 메시지 전송
* @param chatDto ChatDto
*/
private void sendMessage(String username, ChatDto chatDto) {
log.info("send chatDto : " + chatDto.toString());
chatRoom.sendMessage(chatDto);
}
/**
* 채팅방 입장
* @param chatDto ChatDto
* @param session 웹소켓 세션
*/
private void enterChatRoom(ChatDto chatDto, WebSocketSession session) {
log.info("enter chatDto : " + chatDto.toString());
chatRoom.enter(chatDto, session);
}
}
기존의 단일채팅서버에서는 서버내의 객체가 채팅에 참여중인 Session을 가지고 메세지를 전달해주었다면 분산서버에서는 하나의 Hub(Redis Broker)에 전달해서 구독하고 있는 구독자가 해당 메세지를 소비하는 형식으로 진행되었습니다.
연습용 레포이다 보니 클래스 네이밍이나 생성비용이 비싼 ObjectMapper를 객체에서 여러번 생성하게끔 하는 로직이 있는데 감안하고 봐주시면 좋을것 같습니다.
너무 코드만 늘어뜨려논것 같아서 조금 아쉽지만 코드 주석에 어느정도 설명을 넣었으니 궁금한거나 피드백은 댓글로 달아주시면 좋을것 같습니다.
단일서버 레포: >>단일서버 레포 클릭<<
분산서버 레포: >>분산서버 레포 클릭<<
단일서버는 스프링부트와 자바 버전이 맞다면 클론받아서 Postman으로 바로 테스트 해보실수 있습니다.
분산서버는 Redis가 컴퓨터에 깔려있거나 Docker를 이용해서 Redis를 실행시키면 클론받아서 Postman으로 바로 테스트 해보실수 있습니다.
안녕하세요! 스프링 웹 소켓에 대해서 공부 중인 학생입니다. 제가 아직 웹 소켓에 대한 지식이 깊지 않아서 질문드리고 싶은 부분이 있습니다. 분산 채팅 서버를 구성할 경우, 로드밸런서가 트래픽을 분산할 텐데, 만약에 로드밸런싱 방식이 라운드 로빈일 경우 클라이언트와 소켓 연결이 되지 않은 채팅 서버로 메시지가 전송될 수 있습니다. 그렇게 되면 채팅 서버에서는 웹 소켓 연결이 되지 않은 클라이언트에서 전송한 메시지를 처리하지 못하게 될 것 같습니다. 그렇다면 분산 채팅 서버에서 로드밸런싱 방식으로 IP Hash 방식만 채택할 수 있나요?
좋은 글 너무 잘봤습니다. 종호님의 글을 참고하여 제 블로그를 작성해보고싶은데 작성해도 괜찮을까요? 출처표기하겠습니다!