SSE 사용기 (2)

Caesars·2023년 10월 28일
0

Springboot

목록 보기
5/5

저번글에 이어 SSE 사용기를 작성했습니다. 여러 개의 인스턴스가 있을 경우 기존 로직대로면 에러가 날 수 있는데 Redis를 사용해서 이슈를 해결해보겠습니다.


문제 상황

이전 구현은 단일 인스턴스에서 정상 작동하지만 분산 환경에서 문제가 있습니다. 아래 다이어그램에 시나리오를 구현했습니다.

수십만의 커넥션을 유지해야 한다면 한 대의 인스턴스로 유지할 수 없고 여러 인스턴스가 필요합니다. 이 경우에 알림 요청을 서버에 보내면 로드 밸런서는 한 개의 인스턴스에만 라우팅 합니다. 다른 인스턴스는 요청을 받지 못하였기에 연결된 client에게 알림을 전달하지 못합니다. 따라서 User B는 알림을 받고 User A와 C는 알림을 수신할 수 없습니다.

Redis Pub/Sub를 사용한 분산 SSE 구현

통신 문제를 해결하기 위해 Redis Pub/Sub 기능을 사용할 수 있습니다. 이를 위해 수신자 인스턴스는 각 인스턴스가 구독자에게 업데이트에 대해 알수 있도록 메시지를 발행합니다. 다음 다이어그램을 살펴보겠습니다.

로드 밸런서가 라우팅한 인스턴스2는 레디스 서버에 특정 채널로 메시지를 발행합니다. 해당 채널을 구독중인 인스턴스는 레디스에서 발행한 메시지를 전달받고 연결된 클라이언트에게 알림을 전달합니다. 전 클라이언트가 알림을 전달 받습니다.

코드

전체 코드를 다 적지는 못하고 주요 부분만 작성했습니다.

RedisConfig

@Configuration
@RequiredArgsConstructor
public class RedisConfig {

    /**
     * RedisTemplate의 인터페이스
     * redisOperations를 통해 RedisConnection에서 넘겨준 byte 값을 객체 직렬화한다.
     */
    @Bean
    public RedisOperations<String, NoticeDto> eventRedisOperations(
            RedisConnectionFactory redisConnectionFactory, ObjectMapper objectMapper) {
        final Jackson2JsonRedisSerializer<NoticeDto> jsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
                NoticeDto.class);
        jsonRedisSerializer.setObjectMapper(objectMapper);
        final RedisTemplate<String, NoticeDto> eventRedisTemplate = new RedisTemplate<>();
        eventRedisTemplate.setConnectionFactory(redisConnectionFactory);
        eventRedisTemplate.setKeySerializer(RedisSerializer.string());
        eventRedisTemplate.setValueSerializer(jsonRedisSerializer);
        eventRedisTemplate.setHashKeySerializer(RedisSerializer.string());
        eventRedisTemplate.setHashValueSerializer(jsonRedisSerializer);
        return eventRedisTemplate;
    }

    /**
     * RedisMessageListenerContainer는 Spring Data Redis에서 제공하는 클래스
     * 컨테이너는 메시지가 도착하면 등록된 MessageListener를 호출하여 메시지를 처리
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        final RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

}

Redis를 사용하기 위해 설정 작업

RedisPublisher

@Service
@RequiredArgsConstructor
@Lazy
public class RedisPublisher {

    private final RedisOperations<String, NoticeDto> eventRedisOperations;

    public void publish(String topic, String message){
        eventRedisOperations.convertAndSend(topic, message);
    }
}

토픽으로 메시지 발행

@RequiredArgsConstructor
@Slf4j
@RestController
@RequestMapping()
public class SseApiController {


    // 전체 발송
    @GetMapping("/api/sse/broadCast/{chanel}")
    public void broadCast(@PathVariable String chanel) {
        log.info("broadCast {}", chanel);
        redisPublisher.publish(chanel, "");
    }

    public void broadCastToChanel(SseConnectionPool sseConnectionPool, String message) {
        log.info("broadCastToChanel {}");

        sseConnectionPool.broadCast();
    }

    @PostConstruct
    public void init(){

            SseConnectionPool sseConnectionPool = new SseConnectionPool();
            MessageListener messageListener = (message, pattern) -> {
                broadCastToChanel(sseConnectionPool, "message");
            };
            String chanel = "chanel";

            connectionPoolMap.put(chanel, sseConnectionPool);
            this.redisMessageListenerContainer.addMessageListener(messageListener, ChannelTopic.of(chanel));
        
    }
}

알림 이벤트가 들어오면 먼저 Redis Server에 알리고, 구독중인 인스턴스가 메시지를 받으면 해당 이벤트들을 연결된 클라이언트에 전파합니다.
MessageListenere들은 redisMessageListenerContainer에 삽입되어 관리되어 redis에 새로운 알림이 발생하면 구독한 모든 대상에게 onMessage가 자동으로 호출됩니다.

결론

이 게시물에서는 Spring SseEmitter와 Redis Pub/Sub를 백본으로 사용하여 분산 SSE를 생성하여 분산 환경에서 동작 방법에 대해 논의했습니다. 이렇게 하면 모든 인스턴스가 변경 사항에 대한 정보를 받고 SSE를 통해 연결된 클라이언트에 이벤트를 내보낼 수 있습니다.


참조

https://www.geekyhacker.com/distributed-sse-with-spring-sseemitter-and-redis-pub-sub/
https://www.linkedin.com/pulse/server-sent-events-sse-spring-boot-aliaksandr-liakh/
https://velog.io/@max9106/Spring-SSE-Server-Sent-Events%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-%EC%8B%A4%EC%8B%9C%EA%B0%84-%EC%95%8C%EB%A6%BC
https://seungpnag.tistory.com/9
https://velog.io/@stella6767/Spring-Boot-SSE-LoadBalancing-%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-Scale-Out
https://github.com/aliakh/demo-spring-sse/blob/master/README.md

profile
잊기전에 저장

0개의 댓글