[SpringBoot] Websocket을 활용한 실시간 채팅 구현 (3) - Redis

gyehwan·2023년 7월 10일
0

Websocket

목록 보기
3/3

이 전의 포스팅에서 STOMP를 활용해서 실시간 채팅의 고도화 작업을 했다.Websocket과 STOMP를 이용한 구현만으로도 채팅의 기본 기능은 충분히 구현 가능하지만 몇 가지 문제가 있다.

서버를 재시작할 때 마다 채팅방 정보 리셋

채팅방의 메인 저장소가 없으므로 서버의 메모리에 적재된 채팅방은 서버를 재시작할 때마다 초기화되는 이슈가 있다. DB를 이용하거나 다른 저장소를 이용하여 채팅방이 계속 유지되도록 해야한다. 이번 포스팅에서는 Redis를 이용해서 이 문제를 해결해보겠다.

채팅 서버가 여러 개이면 서버 간 채팅방 공유 불가

이전 포스팅에서 Websocket과 STOMP pub/sub을 이용하여 구현한 구조는 pub/sub이 발생한 서버 내에서만 메세지를 주고 받는 것이 가능하다. 즉 채팅방(topic)이 생성된 서버 안에서만 유효하므로 다른 서버로 접속한 클라이언트는 해당 채팅방을 볼 수 없다. 따라서 한 채팅방에 여러 서버에서 접근할 수 있도록 개선해야한다. 이를 해결하려면 공통으로 사용할 수 있는 pub/sub 시스템을 구축하고 모든 서버들이 해당 시스템을 통하여 pub/sub메세지를 주고받도록 변경해야 한다.
Redis는 pub/sub을 지원하고 있으며 공통 pub/sub 채널로 이용하기 알맞다. Redis를 활용하여 서로 다른 서버에 접속해 있는 클라이언트가 채팅방을 통해 다른 서버의 클라이언트와 메세지를 주고받을 수 있도록 구현해보자.

Redis pub/sub을 이용한 채팅 고도화

build.gradle

dependencies에 다음 라이브러리를 추가한다. local에서는 Redis 설치없이 간단하게 Embedded Redis를 사용하여 환경을 구축해보겠다.

	implementation 'org.springframework.boot:spring-boot-starter-data-redis'
	compileOnly group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'

Embedded Redis 서버 사용을 위한 설정

채팅 서버가 실행될 때 Embedded Redis 서버도 동시에 실행될 수 있도록 설정을 추가한다. local 환경에서만 실행되도록 @Profile("local")을 상단에 선언한다.

EmbeddedRedisconfig

package com.websocket.chat.config;
// import 생략...

/**
 * 로컬 환경일경우 내장 레디스가 실행됩니다.
 */
@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void redisServer() {
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        if (redisServer != null) {
            redisServer.stop();
        }
    }
}

Redis 설정

Redis의 pub/sub 기능을 이용할 것이므로 MessageListener 설정을 추가한다. 애플리케이션에서 Redis 사용을 위해 redisTemplate 설정도 추가한다.

RedisConfig

@Configuration
public class RedisConfig {

    /**
     * redis pub/sub 메시지를 처리하는 listener 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    /**
     * 어플리케이션에서 사용할 redisTemplate 설정
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }
}

환경 설정 수정 및 추가

application.yml

기본 프로파일을 local로 설정한다.

spring:
  profiles:
    active: local

application-local.yml

local 환경 설정 파일을 추가한다. Embedded Redis는 local에 실행되므로 아래와 같이 추가해준다.

spring:
  profiles:
    active: local
  redis:
    host: localhost
    port: 6379

application-alpha.yml

alpha 서버용 환경 설정 파일이다. 로컬에서만 테스트할 것이라면 필요없지만, 다른 환경의 테스트가 필요할 경우 세팅해준다. Redis 정보는 서버 환경에 설치된 Redis 정보로 대체한다.

spring:
  profiles:
    active: alpha
  redis:
    host: redis가 설치된 서버 호스트
    port: redis가 설치된 서버 포트

Redis 발행/구독 모델 구현을 위한 서비스 생성

Redis에는 공통 주제(Topic)에 대하여 구독자(subscriber)에게 메세지를 발행(publish)할 수 있는 기능이 있다. Spring에서는 redis topic에 대하여 구독 및 발행을 처리할 수 있도록 다음과 같이 방법을 제공하고 있다.

Redis 발행 서비스 구현

채팅방에 입장하여 메세지를 작성하면 해당 메세지를 Redis Topic에 발행하는 기능의 서비스이다. 이 서비스를 통해 메세지를 발행하면 대기하고 있던 Redis 구독 서비스가 메세지를 처리한다.

@RequiredArgsConstructor
@Service
public class RedisPublisher {
    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatMessage message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

Redis 구독 서비스 구현

Redis에 메세지 발행이 될 때까지 대기하였다가 메세지가 발행되면 해당 메세지를 읽어 처리하는 리스너이다. MessageListener를 상속받아 onMessage 메소드를 작성한다. Redis에 메세지가 발행되면 해당 메세지를 ChatMessage로 변환하고 messaging Template을 이용하여 채팅방의 모든 websocket 클라이언트들에게 메세지를 전달하도록 구현되어 있다.

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메시지가 발행(publish)되면 대기하고 있던 onMessage가 해당 메시지를 받아 처리한다.
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // redis에서 발행된 데이터를 받아 deserialize
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            // ChatMessage 객채로 맵핑
            ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
            // Websocket 구독자에게 채팅 메시지 Send
            messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}

ChatController 수정

클라이언트가 채팅방 입장시 채팅방에서 대화가 가능하도록 리스너를 연동하는 enterChatRoom 메서드를 세팅한다. 채팅방에 발행된 메세지는 서로 다른 서버에 공유하기 위해 redis의 Topic으로 발행한다.

@RequiredArgsConstructor
@Controller
public class ChatController {

    private final RedisPublisher redisPublisher;
    private final ChatRoomRepository chatRoomRepository;

    /**
     * websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/message")
    public void message(ChatMessage message) {
        if (ChatMessage.MessageType.ENTER.equals(message.getType())) {
            chatRoomRepository.enterChatRoom(message.getRoomId());
            message.setMessage(message.getSender() + "님이 입장하셨습니다.");
        }
        // Websocket에 발행된 메시지를 redis로 발행한다(publish)
        redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
    }
}

ChatRoomRepository 수정

채팅방 정보는 초기화 되지 않도록 생성 시 Redis Hash에 저장하도록 처리한다. 채팅방 정보를 조회할 때는 Redis Hash에 저장된 데이터를 불러오도록 메소드 내용을 수정한다. 채팅방 입장 시에는 채팅방 id로 Redis topic을 조회하여 pub/sub 메세지 리스너와 연동한다.

@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
    // 채팅방(topic)에 발행되는 메시지를 처리할 Listner
    private final RedisMessageListenerContainer redisMessageListener;
    // 구독 처리 서비스
    private final RedisSubscriber redisSubscriber;
    // Redis
    private static final String CHAT_ROOMS = "CHAT_ROOM";
    private final RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, ChatRoom> opsHashChatRoom;
    // 채팅방의 대화 메시지를 발행하기 위한 redis topic 정보. 서버별로 채팅방에 매치되는 topic정보를 Map에 넣어 roomId로 찾을수 있도록 한다.
    private Map<String, ChannelTopic> topics;

    @PostConstruct
    private void init() {
        opsHashChatRoom = redisTemplate.opsForHash();
        topics = new HashMap<>();
    }

    public List<ChatRoom> findAllRoom() {
        return opsHashChatRoom.values(CHAT_ROOMS);
    }

    public ChatRoom findRoomById(String id) {
        return opsHashChatRoom.get(CHAT_ROOMS, id);
    }

    /**
     * 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
     */
    public ChatRoom createChatRoom(String name) {
        ChatRoom chatRoom = ChatRoom.create(name);
        opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
        return chatRoom;
    }

    /**
     * 채팅방 입장 : redis에 topic을 만들고 pub/sub 통신을 하기 위해 리스너를 설정한다.
     */
    public void enterChatRoom(String roomId) {
        ChannelTopic topic = topics.get(roomId);
        if (topic == null) {
            topic = new ChannelTopic(roomId);
            redisMessageListener.addMessageListener(redisSubscriber, topic);
            topics.put(roomId, topic);
        }
    }

    public ChannelTopic getTopic(String roomId) {
        return topics.get(roomId);
    }
}

ChatRoom Serialize

Redis에 저장되는 객체들은 Serialize 가능해야하므로 Serializable을 참조하도록 선언하고 serialVersionUID를 세팅해준다.

Serialization

@Getter
@Setter
public class ChatRoom implements Serializable {

    private static final long serialVersionUID = 6494678977089006639L;

    private String roomId;
    private String name;

    public static ChatRoom create(String name) {
        ChatRoom chatRoom = new ChatRoom();
        chatRoom.roomId = UUID.randomUUID().toString();
        chatRoom.name = name;
        return chatRoom;
    }
}

🚨 트러블 슈팅


Can't start redis server. Check logs for details
찾아보니.. Mac M1에서 Embedded Redis를 실행할 수 없다고 한다
Redis가 M1의 ARM 프로세서 아키텍처에서 실행되는 것을 지원하지 않는다고 한다.

해결 방법

RedisServer(File executable, int port) 생성자를 통해 문제를 해결해보자.
Redis의 소스코드를 받아 컴파일한다.

// 소스 코드 다운로드
$ wget https://download.redis.io/releases/redis-6.2.5.tar.gz

// 압축 해제
$ tar xzf redis-6.2.5.tar.gz 

// redis-6.2.5 디렉토리 이동
$ cd redis-6.2.5

// 파일 링크 및 설치 등
$ make

//redis-server 바이너리 파일 실행
$ src/redis-server

EmbeddedRedisConfig 수정 - 분기하는 코드 추가

계속 안됩니다.. 버전도 M1용으로 바꿨는데..

해결했습니다 !! ! ! ! !!
최종 코드는 아래와 같습니다.

@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.data.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void startRedis() throws Exception {
        int port = isRedisRunning() ? findAvailablePort() : redisPort;
        if (isArmArchitecture()) {
            redisServer = new RedisServer(Objects.requireNonNull(getRedisServerExecutable()), port);
        } else {
            redisServer = RedisServer.builder()
                    .port(port)
                    .setting("maxmemory 128M")
                    .build();
        }
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        redisServer.stop();
    }
    public int findAvailablePort() throws Exception {
        for (int port = 10000; port <= 65535; port++) {
            Process process = executeGrepProcessCommand(port);
            if (!isRunning(process)) {
                return port;
            }
        }

        throw new Exception();
    }

    private boolean isRedisRunning() throws Exception {
        return isRunning(executeGrepProcessCommand(redisPort));
    }

    private Process executeGrepProcessCommand(int redisPort) throws IOException {
        String command = String.format("netstat -nat | grep LISTEN|grep %d", redisPort);
        String[] shell = {"/bin/sh", "-c", command};

        return Runtime.getRuntime().exec(shell);
    }

    private boolean isRunning(Process process) throws Exception {
        String line;
        StringBuilder pidInfo = new StringBuilder();
        BufferedReader bf = new BufferedReader(new InputStreamReader(System.in)); //선언

        try (BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
            while ((line = input.readLine()) != null) {
                pidInfo.append(line);
            }
        } catch (Exception e) {
            throw new Exception();
        }
        return StringUtils.hasText(pidInfo.toString());
    }

    private File getRedisServerExecutable() throws Exception {
        try {
            return new ClassPathResource("binary/redis/redis-server-6.0.10-mac-arm64").getFile();
        } catch (Exception e) {
            throw new Exception();
        }
    }

    private boolean isArmArchitecture() {
        return System.getProperty("os.arch").contains("aarch64");
    }
    
}

다중 서버 채팅 테스트는 다음 포스팅에서 진행하겠습니다.

0개의 댓글

관련 채용 정보