Redis pub/sub & SSE

정석·2024년 9월 27일

TIL

목록 보기
40/40
post-thumbnail

GitHub Link : 배달 서비스 플랫폼

알림 기능을 구현하는 여러 가지 방법이 있지만, SSE(Server-Sent Events)를 사용하여 개발하게 되었습니다. 이와 관련해 이번 구현에서 사용된 기술 및 구조에 대해 회고하는 글을 작성하고자 합니다.


💡 SSE

SSE(Server-Sent Events)는 HTML5 표준에 정의된 프로토콜로, 서버에서 클라이언트로 실시간 데이터를 스트리밍하는 기술입니다. SSE는 서버에서 클라이언트로 단방향 통신을 지원하며, 클라이언트는 서버와의 연결이 유지되는 동안 지속적으로 이벤트를 수신할 수 있습니다.

위 그림에서 보듯, 서버와 클라이언트는 연결을 유지하며 서버는 새로운 이벤트가 발생할 때마다 이를 클라이언트에 전송합니다. 기존 방식에서는 클라이언트가 서버에 주기적으로 요청을 보내야 하지만, SSE를 활용하면 실시간으로 알림이나 이벤트를 자동으로 수신할 수 있습니다.

따라서, 이번 프로젝트에서 Spring의 SseEmitter 클래스를 사용하여 실시간 알림 기능을 구현했습니다.

SseEmitter

SseEmitterSpring MVC에서 SSE를 구현할 수 있도록 제공하는 클래스입니다. 이를 통해 클라이언트가 실시간으로 서버의 이벤트를 비동기적으로 수신할 수 있습니다.

주요 메서드

메서드설명
send(Object data)서버에서 클라이언트로 데이터를 전송합니다.
complete()모든 이벤트 전송이 완료된 후, 연결을 정상적으로 종료합니다.
completeWithError(Throwable ex)에러가 발생했을 때 연결을 종료합니다.
onTimeout(Runnable callback)타임아웃이 발생했을 때 실행할 콜백을 설정합니다.
onCompletion(Runnable callback)모든 작업이 완료되었을 때 실행할 콜백을 설정합니다.

이처럼 SseEmitter는 서버와 클라이언트 간 실시간 통신을 간편하게 구현할 수 있게 해줍니다.


💡 Redis Pub/Sub

SSE를 사용해 클라이언트에게 실시간 알림을 전송하기 위해서는 누구에게 알림을 보낼지를 결정해야 합니다. 이때 Redis의 Pub/Sub 기능이 사용됩니다. Redis Pub/Sub는 발행/구독 패턴을 사용하여 서버 간 메시지를 빠르게 전달할 수 있습니다.

Pub/Sub 동작 과정

  1. 구독: 사용자는 특정 Redis 채널을 구독(subscribe)합니다.
  2. 발행: 서버는 특정 Redis 채널에 메시지를 발행(publish)합니다.
  3. 전달: 구독 중인 사용자에게 SseEmitter를 통해 메시지가 실시간으로 전달됩니다.

Pub/Sub을 사용한 이유

처음에는 Java 내부의 Map 자료구조를 사용하여 구현하려 했으나, 다음과 같은 문제들이 있었습니다

  1. 동시성 문제: 여러 스레드에서 동일한 Map을 사용하는 경우 동기화 문제가 발생할 수 있습니다.
  2. 다중 서버 환경에서 문제: Map은 서버 간에 공유되지 않으므로, 다중 서버에서 메시지를 처리할 수 없습니다.
  3. Stateless하지 않은 서버: 서버가 상태를 유지하므로 확장성과 유지보수성이 떨어집니다.

위 문제를 해결하기 위해 Redis를 도입하여, 서버 간 데이터 공유다중 서버 환경에서의 확장성을 확보할 수 있었습니다.

참고 영상

Redis Pub/Sub을 사용한 실시간 알림 시스템은 대규모 사용자에게 빠르게 메시지를 전달하는 데 적합합니다. 라인(LINE)과 같은 대규모 시스템에서도 이 기술을 사용하여 수많은 사용자에게 실시간 스트리밍 서비스를 제공하고 있습니다. 기술 관련 영상을 보던 중 참고한 영상입니다.

출처 : Redis Pub/Sub을 사용해 대규모 사용자에게 고속으로 설정 정보를 배포한 사례 - 2021


💻 구현 로직

알림 Controller

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/admin")
public class NotificationController {

    private final NotificationService notificationService;

    @PostMapping("/notices")
    public ResponseEntity<String> sendNoticeToAll(@RequestBody NoticeCreateRequest request) {
        notificationService.sendNoticeToAll(request);
        return ResponseEntity.ok("전체 유저에게 메세지 전송 완료.");
    }
}
  • NotificationController관리자 권한으로 모든 사용자에게 메시지를 전송하는 API를 제공합니다.

알림 Service

@RequiredArgsConstructor
@Service
public class NotificationService {

    private final RedisPublisher redisPublisher;

    public void sendNoticeToAll(NoticeCreateRequest request) {
        redisPublisher.publish(request);
    }
}
  • NotificationService는 메시지를 Redis 채널에 발행(publish)하여 전체 사용자에게 알림을 보냅니다.

RedisPublisher

@RequiredArgsConstructor
@Service
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ObjectMapper objectMapper;

    public void publish(NoticeCreateRequest request) {

        try {
            String message = objectMapper.writeValueAsString(request); // 직렬화
            redisTemplate.convertAndSend("notification-channel", message); // 발행

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • RedisPublisherRedis 채널에 메시지를 발행(publish)합니다. 여기서 NoticeCreateRequest 객체를 JSON 형식으로 직렬화하여 Redis에서 처리할 수 있도록 합니다.

RedisConfig

@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listenerAdapter, new PatternTopic("notification-channel"));
    return container;
}

@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber redisSubscriber) {
    return new MessageListenerAdapter(redisSubscriber, "handleMessage");
}
  • RedisMessageListenerContainerRedis 채널을 구독하고, 새로운 메시지가 도착하면 이를 처리할 리스너를 등록합니다.

RedisSubscriber

@RequiredArgsConstructor
@Service
public class RedisSubscriber {

    private final ObjectMapper objectMapper;
    private final RedisEmitterRepository redisEmitterRepository;

    public void handleMessage(String message) {
        try {
            Notification notification = objectMapper.readValue(message, Notification.class);

            for (Long userId : redisEmitterRepository.getAllEmitter()) {
                SseEmitter emitter = redisEmitterRepository.getEmitter(userId);

                if (emitter != null) {
                    emitter.send(SseEmitter.event().name("notification").data(notification));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • handleMessage 메서드는 Redis에서 수신한 메시지를 처리합니다.
  • redisEmitterRepository.getAllEmitter()를 통해 등록된 모든 사용자에게 실시간으로 알림을 전송합니다.
  • 각 사용자의 Emitter를 조회한 후 emitter.send() 메서드를 통해 SSE를 이용해 클라이언트에 메시지를 전달합니다.

전체 유저 알림 전송 테스트

  • 다음과 같이 서버에서 알림을 보내면 notificaiton-channel 을 구독한 유저들에게 모든 알림이 전송됩니다.


💁 개선할 점

현재의 Redis 설정은 단일 노드 환경에서 동작하고 있기에 여러 문제가 존재합니다.

  1. Redis 인스턴스가 다운될 경우, 알림 시스템 전체가 중단됨
  2. 단일 노드이기에 트래픽이 증가함에 따라 성능 저하가 발생

따라서 이러한 문제를 해결하기 위해 다음 프로젝트에서는 Redis Clster 를 시도할 예정이고, 이를 통해 시스템의 확장성과 고가용성을 보장하고 장애 상황에서도 안정적인 알림 시스템이 동작하도록 개선할 것입니다.

0개의 댓글