Redis Pub/Sub을 사용하는 이유 [With Rand_Chat Projcet]

Agida·2024년 12월 5일

RanChat Project

목록 보기
8/8
post-thumbnail

위치기반 랜덤채팅 프로젝트인 Rand_Chat Project를 진행하면서 해결했던 사항이다.

  • 채팅 애플리케이션인 만큼 WebSocket 과 Sse를 사용해야 하는 상황이 많았다.
    • Websocket은 양방향 통신 , Sse는 Server-Sent-Event로 서버에서 - > 클라이언트로 흐르는 단방향 통신이다.



  • 프로젝트 초기 설계구조

  • 위에 보다시피 프로젝트 구조는 다중인스턴스로드밸런싱되고 있는 형태이다. 사람들을 매칭시켜주는 매칭서버가 여러 인스턴스 , 여러 채팅서버 , 회원관리 서버다중서버 환경 및 어느정도 MSA 방식으로 설계되어있다. (완벽한 MSA는 아니다.)



PROBLEM


  • 문제는 이런 로드밸런싱환경에서 웹소켓, Sse를 사용하였을때 어느 서버와 연결되었는지 모른다는 것이다.

    예를 들어 사용자1매칭을 위해 매칭대기열을 요청하고 , 매칭 완료 수신을 받기위한 Sse 연결을 하였다고 가정하였을 때 Sse 연결매칭서버1이 되었고 , 매칭알람을 보내는 순간 어느 인스턴스에서 Sse 커넥션이 일어났는지 모른다 . 마찬가지로 웹소켓양방향 통신을 하기위해 특정 인스턴스연결확립을 하였을 텐데 , 어느 인스턴스와 통신을 해야할지 모르는 상황이다.

Solving


  • 이러한 상황에서 RedisPub/Sub을 통해 해결 할 수 있었다. Pub/Sub을 Message Brocker로 이용하고 연결이 확립된 인스턴스를 특정하여 다중서버환경에서도 웹소켓이나 Sse 통신이 가능토록 했다.

출처(https://seungpnag.tistory.com/9)


  • 위와 같이 Pub/Sub 모델에서 모든 인스턴스가 SubScribe을 하고 , 해당 서버 , 채널 정보 등을 Pub/Sub 모델에 저장한 뒤 , Publish가 일어났을 때 Pub/Sub모델에 저장된 정보를 확인하여 특정 인스턴스메시지를 전달하는 방식이다.

Code


  • 아래 코드 및 설명은 프로젝트 진행간 매칭 대기열 및 , 매칭완료시 매칭된 2명의 유저에게 Sse를 보내는 로직이다.


    1. 먼저 Redis Pub/Sub을 사용하기 위해 RedisMessageListenerContainer를 등록 해 주었다 .
 @Bean
    public RedisMessageListenerContainer redisContainer(
            RedisConnectionFactory connectionFactory, SubsCriber subscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 매칭 채널
        container.addMessageListener(subscriber, new PatternTopic(PubSubChannel.MATCHING_CHANNEL.toString()));
        container.addMessageListener(subscriber, new PatternTopic(PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString()));


        return container;
    }
  • 이를 통해 매칭 채널을 모든 인스턴스가 SubScribe 하고있다.



2. 각 인스턴스별로 Sse커넥션을 관리하는 CuncurrentHashMap과 연결에 관한 정보제공, 등록,제거를 할 수 있는 SseConnectionRegistry 클래스를 만들어 주었다.

public class SseConnectionRegistry {
    // 클라이언트 ID (userId)와 SSE 연결을 관리하는 맵 CuncerrnetHashMap 동시성 제어 유리
    private static final ConcurrentHashMap<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

    // 클라이언트의 SSE 연결을 등록
    public static void register(String userId,String channel, SseEmitter emitter) {
        sseEmitters.put(userId+":"+channel, emitter);
    }

    // 클라이언트의 SSE 연결을 조회
    public static SseEmitter getEmitter(String userId,String channel) {
        return sseEmitters.get(userId+":"+channel);
    }

    // 클라이언트의 SSE 연결을 제거
    public static void removeEmitter(String userId,String channel) {
        sseEmitters.remove(userId+":"+channel);
    }

    // 모든 SSE 연결을 가져옴
    public static ConcurrentHashMap<String, SseEmitter> getAllEmitters() {
        return sseEmitters;
    }
}




3. 클라이언트가 Publish했을 때 즉 , Service단에서 Publish 하여 이벤트를 발생 , 구독자에게 메시지를 전파해주는 Publisher 클래스를 만들었다.

@Service
@RequiredArgsConstructor
public class Publisher {
    private final StringRedisTemplate redisTemplate;

    //매칭결과
    public void sendNotification(String userId, String nickname,String profileImg,String sex,String type,String distance,String channel) {
        String payload = "";
        //매칭성공 시
        if(type.equals(SSETYPE.MATCHINGCOMPLETE.toString())){
            if(sex.equals(MembersSex.MAN.toString())){
                sex = "남자";
            }
            else{
                sex = "여자";
            }

        }
        // 매칭 성공 or 실패 시
        if(type.equals(SSETYPE.MATCHINGCOMPLETE.toString()) || type.equals(SSETYPE.MATCHINGTIMEOUT.toString())){
            payload = String.format("{\"userId\":\"%s\",\"nickname\":\"%s\",\"profileImg\":\"%s\",\"sex\":\"%s\",\"type\":\"%s\",\"distance\":\"%s\",\"channel\":\"%s\"}", userId, nickname,profileImg,sex,type,distance,channel);

            redisTemplate.convertAndSend(PubSubChannel.MATCHING_CHANNEL.toString(), payload);
        }


    }
  • 위 는 적당한 값으로 포맷팅 한 후 매칭채널로 payload와 함께 브로드캐스트하는 로직이다 .
redisTemplate.convertAndSend(PubSubChannel.MATCHING_CHANNEL.toString(), payload);

이 메소드는 첫번째 인자를 구독하고 있는 인스턴스에게 payload와 함께 메시지를 전파하게 된다. 따라서 예상결과는 매칭서버1 , 매칭서버2 등 모든 서버에게 전파가 갈 것이다.




4. 실질적으로 Sse 커넥션 객체를 생성하고 , 메시지를 받은 서버에서 Sse커넥션이 있는 위치를 판별하기 위한 Sse 연결정보Redis에 저장하는 역할을 할 SseNotificationService를 만들어 주었다.

@Service
@Slf4j
public class SseNotificationService implements NotificationService {

    private final StringRedisTemplate redisTemplate;

    public SseNotificationService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }


    // 특정 회원의 SSE 연결을 Redis에 저장
    public void registerConnection(String userId,String channel, String serverInstanceId,String key) {
        redisTemplate.opsForValue().set(key + userId+":"+channel, serverInstanceId);
    }

    // 특정 회원의 SSE 연결 정보 조회
    public String getServerInstanceForUser(String userId,String channel,String key) {
        return redisTemplate.opsForValue().get(key + userId+":"+channel);
    }

    // 연결 해제 시 삭제
    public void removeConnection(String userId,String channel,String key) {
        redisTemplate.delete(key + userId+":"+channel);
    }


    public SseEmitter connect(String userId,String channel,String key) {
        SseEmitter emitter = new SseEmitter(180 * 1000L); // 90초 타임아웃
        SseConnectionRegistry.register(userId,channel, emitter);
        registerConnection(userId, channel,getCurrentServerInstanceId(),key);
        emitter.onCompletion(() -> cleanup(userId,channel,key));
        emitter.onTimeout(() -> cleanup(userId,channel,key));
        emitter.onError(e -> cleanup(userId,channel,key));
        return emitter;
    }

    private void cleanup(String userId,String channel,String key) {
        SseConnectionRegistry.removeEmitter(userId,channel);
        removeConnection(userId,channel,key);
    }

    private String getCurrentServerInstanceId() {
        return System.getenv("INSTANCE_ID"); // 서버 인스턴스 ID 반환
    }
}
  • 위 로직을 요약하면 getCurrentServerInstanceId를 통해 현재 인스턴스의 서버ID를 반환받고 , 서버IDUserId, Channel명을 조합해 레디스에 저장하고 1번에서 만든 클래스의 SseConnectionRegistry.register를 통해 현재 인스턴스 CuncurrentHashMapSse커넥션 객체를 담는다.




5. Publish되었을때 여러 인스턴스 중 요청자의 Sse커넥션이 담긴 서버를 찾는 로직이 필요하며 , 해당 Sse객체를 통해 요청자에게 전송할 로직이 필요하다. 이를 위해 SubScriber 클래스를 만들었다.

@Slf4j
@Component
public class SubsCriber implements MessageListener {


    private final NotificationService connectionService;


    public SubsCriber(NotificationService connectionService) {

        this.connectionService = connectionService;

    }

    // Redis 메시지 수신 처리
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try{
            String payload = new String(message.getBody());
            Map<String, String> data = null;

            data = parsePayload(payload);
            //필수 값
            String userId = data.get("userId");
            String channel = data.get("channel");

            String serverInstanceId = "";
            // 현재 서버에 연결된 클라이언트라면 메시지 전송

            if(channel.equals(PubSubChannel.MATCHING_CHANNEL.toString())){
                //매칭 채널
                serverInstanceId = connectionService.getServerInstanceForUser(userId,PubSubChannel.MATCHING_CHANNEL.toString(), RedisKey.SSE_MATCHING_CONNECTION_KEY);
            }
            else if (channel.equals(PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString())){
                serverInstanceId = connectionService.getServerInstanceForUser(userId,PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString(), RedisKey.SSE_MATCHING_ACCEPT_CONNECTION_KEY);
            }

            log.info("serverTest={}",serverInstanceId);
            if (isCurrentInstance(serverInstanceId)) {
                //분기
                if(channel.equals(PubSubChannel.MATCHING_CHANNEL.toString())){
                    log.info("t1");
                    //매칭채널
                    String nickname = data.get("nickname");
                    String profileImg = data.get("profileImg");
                    String sex = data.get("sex");
                    String type= data.get("type");
                    String distance = data.get("distance");
                    matchingResultSendToClient(userId, nickname,profileImg,sex,type,distance);
                }else if (channel.equals(PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString())){
                    log.info("t2");
                    //매칭 수락 채널
                    matchingAcceptSendToClient(userId);
                }

            }
        }
        catch (Exception e){
            throw new RuntimeException(e);
        }

    }

    private Map<String, String> parsePayload(String payload) throws JsonProcessingException {
        // JSON 문자열을 Map으로 변환 (간단한 파서 사용)
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(payload, new TypeReference<>() {});
    }

    private boolean isCurrentInstance(String serverInstanceId) {
        // 현재 서버 인스턴스와 비교 (로드밸런싱 환경에서 인스턴스 ID 비교)
        return getCurrentInstanceId().equals(serverInstanceId);
    }

    //매칭 결과 전송
    private void matchingResultSendToClient(String userId, String nickname,String profileImg,String sex,String type,String distance) {
        // SSE 연결된 클라이언트에게 메시지 전송
        SseEmitter emitter = SseConnectionRegistry.getEmitter(userId,PubSubChannel.MATCHING_CHANNEL.toString());
        if (emitter != null) {
            try {
                if(type.equals(SSETYPE.MATCHINGCOMPLETE.toString())){
                    ResMatchResultDTO resMatchResultDTO = new ResMatchResultDTO();
                    resMatchResultDTO.setNickname(nickname);
                    resMatchResultDTO.setProfileImg(profileImg);
                    resMatchResultDTO.setSex(sex);
                    resMatchResultDTO.setDistance(distance);

                    ResponseDTO<ResMatchResultDTO> responseDTO = new ResponseDTO(resMatchResultDTO);
                    emitter.send(SseEmitter.event().name(PubSubChannel.MATCHING_CHANNEL.toString()).data(responseDTO));
                    log.info("test result= {}",responseDTO);
                    log.info("emmiter={}",emitter);
                    log.info("emmiter str ={}",emitter.toString());
                    log.info("emmiter id={}",userId);
                }
                else if(type.equals(SSETYPE.MATCHINGTIMEOUT.toString())){
                    ResponseErr responseErr = new ResponseErr(ErrorCode.COMMON_SSE_MATCH_1MIN_TIME_OUT);
                    emitter.send(SseEmitter.event().name(PubSubChannel.MATCHING_CHANNEL.toString()).data(responseErr));
                    log.info("test result= {}",responseErr);
                    log.info("emmiter={}",emitter);
                    log.info("emmiter id={}",userId);
                }


            } catch (Exception e) {
                // 전송 실패 시 처리
                SseConnectionRegistry.removeEmitter(userId,PubSubChannel.MATCHING_CHANNEL.toString());
                log.info("test result= fail");
            }
        }
    }

  
    private String getCurrentInstanceId() {
        // 서버 인스턴스 ID 반환 (필요시 환경변수 또는 설정값 사용)
        log.info("serverId={}",System.getenv("INSTANCE_ID"));
        return System.getenv("INSTANCE_ID");
    }


}
  • 위 로직의 핵심만 요약하자면 , getCurrentInstanceId로 현재 인스턴스의 ID를 반환받고 4번에서 저장한 Redis에서 UserIdChannel에 해당하는 서버 인스턴스를 반환받는다. 이둘을 비교하여 , 같으면 로직을 수행한다.

    예를 들어 서버1 , 서버2 , 서버3 모두가 OnMessage 메소드를 실행하게 될 것이고(SubScribe중) userId가 1번 , 채널명이 매칭채널이라고 하면 , 매칭채널:1 에 해당하는 서버인스턴스Redis에서 가져온다.
    그 값이 서버3 이라고 하면 , 서버1 , 서버2Sse커넥션을 가져오는 로직은 수행 하지 않을것이고, 서버3만이 수행한다.




6. Sse커넥션을 생성 요청하는 컨트롤러 및 Publish할 서비스를 만든다 . (상세로직 생략)

  • Sse 커넥션 연결 컨트롤러
 @GetMapping
    public SseEmitter matchingConnect(){
      String usrId=String.valueOf(SecurityContextGet.getUsrId());
        return sseNotificationService.connect(usrId,PubSubChannel.MATCHING_CHANNEL.toString(), RedisKey.SSE_MATCHING_CONNECTION_KEY);
    }

  • 서비스내 Publish메소드(매칭된 두명의 사용자에게 Sse 전송)
       publisher.sendNotification(firstUserId,secondeMemberInfo.getNickName(),secondeMemberInfo.getProfileImg(),secondeMemberInfo.getSex().toString(),
                SSETYPE.MATCHINGCOMPLETE.toString(),strDistance, PubSubChannel.MATCHING_CHANNEL.toString());
        publisher.sendNotification(secondUserId,firstMemberInfo.getNickName(),firstMemberInfo.getProfileImg(),firstMemberInfo.getSex().toString(),
                SSETYPE.MATCHINGCOMPLETE.toString(),strDistance, PubSubChannel.MATCHING_CHANNEL.toString());



정리


  • Redis Pub/Sub은 다음과 같은 특징을 가진다.


  1. 브로드캐스트 기반 메시징

    Redis Pub/Sub은 발행자-구독자(Publish/Subscribe) 패턴을 사용하여, 발행된 메시지를 특정 채널에 구독 중인 모든 클라이언트에게 전달

    메시지가 발행되면 해당 채널을 구독 중인 모든 리스너(구독자)가 메시지를 동시에 수신

  2. 비동기 통신

    메시지 발행과 수신은 비동기로 처리
    발행자는 메시지 전송 후 즉시 반환하며, 구독자는 메시지를 수신하는 순간 처리

  3. 구독 중심 모델

    구독자는 하나 이상의 특정 채널을 구독하거나, 패턴 기반으로 여러 채널을 구독할 수 있음.

    채널 패턴 예시:
    특정 채널: channel1
    패턴: channel* (모든 channel로 시작하는 채널 구독)

  4. 중앙 서버 역할

    Redis는 Pub/Sub 시스템에서 중앙 서버 역할을 하며, 발행자와 구독자 사이의 메시지 전달을 담당

    발행자는 Redis 서버로 메시지를 전송하고, Redis 서버는 이를 구독자에게 중계

  5. 실시간 메시징

    Pub/Sub은 주로 실시간 메시징 시나리오에 적합

  1. 메시지 저장 X

    Pub/Sub는 메시지를 저장하지 않음.

    메시지가 발행될 때 구독자가 연결되어 있지 않다면, 해당 메시지는 손실.

    RabbitMQ 또는 Kafka와 같은 메시지 큐 시스템과 달리 메시지 보존 기능이 없음.

  2. 구독자는 지속 연결 필요

    구독자는 Redis 서버와 지속적으로 연결되어 있어야 메시지를 수신할 수 있음.

    연결이 끊기면 구독이 취소되며, 끊긴 동안 발행된 메시지는 받을 수 없음.

profile
백엔드

0개의 댓글