[Project] SSE(Servcer-Sent-Events)로 실시간 알림 기능 구현하기 !

현주·2023년 6월 24일
13

카풀 서비스 프로젝트 개발 중에,

  • 카풀 게시물에 정상적으로 신청 완료가 되었을 때
  • 내가 쓴 게시물에 신청 요청이 왔을 때 등

위와 같은 상황에서 알림 기능이 필요하였고, 로그인한 상태일 때 실시간으로 알림을 받을 수 있도록 구현하고 싶었다.

실시간 웹 애플리케이션을 구현할 경우 사용되는 대표적인 방법으로 polling / websocket / SSE 가 있는데

✔️ polling (client pull)

  • 클라이언트가 일정한 주기로 서버에 업데이트 요청을 보내는 방식
  • 지속적인 HTTP 요청이 발생하므로 리소스 낭비가 발생

✔️ webSocket (server push)

  • 실시간 양방향 통신을 위한 스펙
  • 서버와 브라우저가 지속적으로 연결된 TCP 라인을 통해 실시간으로 데이터를 주고받을 수 있도록 하는 HTML5 사양
  • 연결을 유지하며 클라이언트-서버 간 양방향 통신이 가능
  • 주로 채팅, 게임, 주식 차트 등에 사용

✔️ SSE (server push)

  • 이벤트가 [ 서버 → 클라이언트 ] 방향으로만 흐르는 단방향 통신
  • 클라이언트가 주기적으로 HTTP 요청을 보낼 필요가 없이 HTTP 연결을 통해 서버에서 클라이언트로 테이터 전달 가능

우리 프로젝트에서 실시간 알림을 구현하기에

polling은 지속적인 요청을 보내야하기에 리소스 낭비가 심할 것 같았고,

웹 소켓(webSocket)처럼 양방향 통신은 필요 없었기 때문에

웹 소켓에 비해 가볍고 서버에서 클라이언트의 방향으로의 소통을 지원하는 SSE 방식을 선택하였다 !


✏️ SSE ( Server-Sent-Events )

  • 서버의 데이터를 실시간, 지속적으로 클라이언트에게 보내는 기술

  • 클라이언트에서 처음 HTTP 연결을 맺고 나면, 서버는 클라이언트로 지속적인 데이터 전송이 가능

    💡 일반적인 HTTP 요청은 [요청-응답]의 과정을 거치고 연결을 종료하는데,
    SSE 방식은 한번 연결하면 클라이언트로 데이터 계속 보낼 수 있음 !

✔ SSE의 특징

  • 실시간 업데이트
    ➜ 서버에서 클라이언트로 실시간으로 데이터 전송 가능
    Ex. 실시간 알림, 주식 시장 정보 등

  • 단방향 통신 ( 서버 -> 클라이언트 )
    ➜ 클라이언트는 서버에 요청을 보내고, 서버는 이벤트를 푸시하는 방식으로 동작

  • HTTP 프로토콜 사용
    ➜ 기존의 HTTP 프로토콜을 사용하므로 별도의 프로토콜이나 라이브러리가 필요 X
    ( 이로써 브라우저와 서버 간의 통신을 간편하게 구현 가능 )

  • 간편한 구현
    ➜ HTTP 프로토콜을 사용하므로 별도의 라이브러리나 프레임워크 없이도 구현 가능
    ( 개발 및 유지보수가 간편해짐 )

  • 접속에 문제가 있으면 자동으로 재연결 시도

But, ( 단점 )
SSE는 지속적인 연결을 유지해야 하는데, 이는 서버 리소스와 클라이언트의 네트워크 연결을 소비하게 되고,
특히 많은 수의 클라이언트가 동시에 연결을 유지하면 서버의 처리 부하가 증가할 수 있음
네트워크 연결이 불안정한 경우에는 연결이 종료되고 재연결이 필요하므로 추가적인 네트워크 오버헤드가 발생할 수 있음 !

💬 Spring framework 4.2 부터 SSE 통신을 지원하는 SseEmitter 클래스가 생겼다.
Spring Framework 5 부터 WebFlux를 이용해서도 SSE 통신을 할 수 있지만,
우리 프로젝트에서는 SseEmitter를 사용하여 구현하려고 한다 !


✔ SseEmitter를 사용한 이유

1. Server-Sent Events (SSE) 프로토콜 지원

  • SseEmitter는 Spring에서 SSE 프로토콜을 지원하기 위한 클래스이므로
    이를 통해 실시간으로 업데이트되는 데이터나 알림과 같은 이벤트를 클라이언트에게 전달 가능

2. 비동기 통신

  • SseEmitter는 비동기적으로 이벤트 전송이 가능
    ( 즉, 서버에서 이벤트가 발생하면 해당 이벤트를 즉시 클라이언트에게 전송 가능 )

  • 실시간성이 중요한 알림 기능에서 유용하며, 클라이언트에게 지연 없이 즉각적인 업데이트 제공 가능

3. 클라이언트의 재시도 및 연결 관리

  • SseEmitter는 클라이언트의 연결 상태를 관리하고, 클라이언트와의 연결이 끊어지는 경우에도 재시도 등의 처리를 지원

  • 클라이언트가 알림을 구독한 후 연결이 끊어지면, 클라이언트는 다시 연결을 시도하고 이전에 미수신한 이벤트를 잃지 않도록 처리 가능

4. 확장성

  • SseEmitter는 여러 클라이언트와 동시에 통신이 가능
    ( 즉, 동일한 알림을 여러 클라이언트에게 전송 가능 )

  • 다수의 클라이언트에게 알림을 전달해야하는 경우에 유용


✔️ 알림 Entity

  • Notify
    @Entity
    @Getter
    @Setter
    public class Notify extends Auditable {
       @Id
       @GeneratedValue(strategy = GenerationType.IDENTITY)
       @Column(name = "notification_id")
       private Long id;private String content;
       /*@Embedded
       private NotificationContent content;*///@Embedded
       //private RelatedURL url;
       private String url;
       @Column(nullable = false)
       private Boolean isRead;@Enumerated(EnumType.STRING)
       @Column(nullable = false)
       private NotificationType notificationType;@ManyToOne
       @JoinColumn(name = "member_id")
       @OnDelete(action = OnDeleteAction.CASCADE)
       private Member receiver;@Builder
       public Notify(Member receiver, NotificationType notificationType, String content, String url, Boolean isRead) {
          this.receiver = receiver;
          this.notificationType = notificationType;
          this.content = content;
          this.url = url;
          this.isRead = isRead;
       }public Notify() {
       }public enum NotificationType{
            YATA, REVIEW, CHAT
       }
    }

✔️ 알림 Controller

  • NotifyController
    @RestController
    @RequestMapping("/api/v1/notify")
    public class NotifyController {
       private final NotifyService notifyService;public NotifyController(NotifyService notifyService) {
          this.notifyService = notifyService;
       }@GetMapping(value = "/subscribe", produces = "text/event-stream")
       public SseEmitter subscribe(@AuthenticationPrincipal User principal,
                                   @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
          return notifyService.subscribe(principal.getUsername(), lastEventId);
       }
    }

➜ 클라이언트가 알림을 구독하는 기능을 수행하는 Controller

➜ SSE 통신을 위해서는 porduces로 반환할 데이터 타입을 text/event-stream로 해줘야함

➜ 유저 정보와 Last-Event-ID를 헤더로 받고 있음

💡 Last-Event-ID 헤더는 항상 담겨있는 것은 아니고,
네트워크 연결 오류 등의 이유로 연결이 끊어졌을 때 클라이언트에 도달하지 못한 알림이 있을 때
이를 이용하여 유실된 데이터를 다시 보내줄 수 있다.


✔️ 알림 Repository

  • NotifyRepositoryImpl
    ➜ 알림(Notify) 객체를 저장하고 관리하는 역할
    Ex. 알림의 생성, 조회, 수정, 삭제 등

    public interface NotifyRepository extends JpaRepository<Notify, Long> {
    }
  • EmitterRepositoryImpl
    ➜ EmitterRepository 인터페이스를 구현한 클래스
    ( 자유로운 확장을 위해 EmitterRepository 인터페이스를 구현한 Impl 파일을 사용하여 인터페이스와 구현체를 분리하였습니다. )

    @Repository
    public class EmitterRepositoryImpl implements EmitterRepository{
       private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
       private final Map<String, Object> eventCache = new ConcurrentHashMap<>();@Override
       public SseEmitter save(String emitterId, SseEmitter sseEmitter) { // emitter를 저장
          emitters.put(emitterId, sseEmitter);
          return sseEmitter;
       }@Override
       public void saveEventCache(String eventCacheId, Object event) { // 이벤트를 저장
          eventCache.put(eventCacheId, event);
       }@Override
       public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) { // 해당 회원과 관련된 모든 이벤트를 찾음
          return emitters.entrySet().stream()
                  .filter(entry -> entry.getKey().startsWith(memberId))
                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
       }@Override
       public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
          return eventCache.entrySet().stream()
                  .filter(entry -> entry.getKey().startsWith(memberId))
                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
       }@Override
       public void deleteById(String id) { // emitter를 지움
          emitters.remove(id);
       }@Override
       public void deleteAllEmitterStartWithId(String memberId) { // 해당 회원과 관련된 모든 emitter를 지움
          emitters.forEach(
                  (key, emitter) -> {
                     if (key.startsWith(memberId)) {
                        emitters.remove(key);
                     }
                  }
          );
       }@Override
       public void deleteAllEventCacheStartWithId(String memberId) { // 해당 회원과 관련된 모든 이벤트를 지움
          eventCache.forEach(
                  (key, emitter) -> {
                     if (key.startsWith(memberId)) {
                        eventCache.remove(key);
                     }
                  }
          );
       }
    }

➜ SSE(Server-Sent Events) 연결을 관리하는 SseEmitter 객체와 이벤트 캐시를 맵 형채로 저장하고 관리하는 역할

✔️ 이벤트 캐시
➜ 클라이언트가 연결을 잃어도 이벤트 유실을 방지하기 위해 임시로 저장되는 데이터

위 코드에서 emitters와 eventCache는 맵 형태의 데이터 구조를 나타냄

  • emitters
    • String 타입의 키와 SseEmitter 타입의 값으로 이루어져 있음
    • 사용자별로 생성된 SseEmitter 객체를 저장하는 역할
    • 클라이언트가 구독(subscribe)을 요청하면 해당 사용자의 식별자를 키로 사용하여 맵에 저장되고,
      이후 알림을 전송할 때 해당 사용자의 SseEmitter를 조회하기 위해 사용됨
  • eventCache
    • String 타입의 키와 Object 타입의 값으로 이루어져 있음
    • 이벤트 캐시를 저장하는 역할
    • 알림을 받을 사용자의 식별자를 키로 사용하여 해당 사용자에게 전송되지 못한 이벤트를 캐시로 저장하고,
      캐시된 이벤트는 사용자가 구독할 때 클라이언트로 전송되어 이벤트의 유실을 방지함으로써 알림의 신뢰성을 확보하기 위해 사용

      Ex. 인터넷 연결 불안정으로 클라이언트-서버 간의 연결이 끊어졌을 때 / 다른 알람으로 인해 클라이언트의 구독 요청이 지연되었을 때 등

✔️ ConcurrentHashMap

  • 스레드 안전(thread-safe)한 맵
  • 동시에 여러 스레드가 접근하더라도 안전하게 데이터를 조작할 수 있도록 보장
  • 이 맵을 사용하여 동시성 문제를 해결하고, 맵에 데이터 저장 / 조회가 가능
    [ ConcurrentHashMap 참고 ]

❗ 일반적으로 JPA를 사용하는 Repository는 JpaRepository 또는 CrudRepository 인터페이스를 상속받아 구현하는데,
이 인터페이스들은 이미 일반적인 CRUD 작업에 필요한 메서드들을 기본적으로 제공하므로, 개발자가 직접 메서드를 구현할 필요가 없음.

But, 위의 EmitterRepository 인터페이스 / EmitterRepositoryImpl 클래스의 경우는
JPA를 사용하지 않고, 단순히 메모리 내의 맵(Map)을 이용하여 데이터를 직접 관리하고 있기 때문에
직접 CRUD 기능들을 구현해야함 !

💡 SseEmitter 객체를 Map 형식으로 저장하는 이유
1. 여러 개의 SseEmitter 객체 관리
➜ emitterRepository는 수신자별로 여러 개의 SseEmitter 객체를 관리하는데,
각 수신자마다 다양한 클라이언트에서의 연결을 지원하기 위해 Map 형식으로 저장
⠀⠀
2. 식별자를 활용한 검색 및 제거
➜ Map은 키-값 쌍으로 데이터를 저장하는 자료구조임
SseEmitter를 Map의 값으로 저장하면, 특정 수신자에 대한 SseEmitter를 식별자(key)를 통해 쉽게 검색하고 접근이 가능
➜ 이를 통해 특정 수신자의 모든 SseEmitter를 관리하거나 필요한 경우 삭제할 수 있음
⠀⠀
3. 동시성 관리
➜ SseEmitter는 HTTP 연결을 유지하는 객체로, 다수의 클라이언트와의 실시간 통신을 위해 사용됨
➜ 여러 클라이언트와의 동시성 처리를 위해 멀티스레드 환경에서 안전하게 동작할 수 있도록 Map 형식으로 저장하고 관리
⠀⠀
4. 이벤트 캐싱
➜ Map을 사용하여 수신자별로 이벤트를 캐시할 수 있음
➜ 클라이언트가 이벤트를 수신하지 못한 경우에도 이벤트를 임시로 저장하고, 다시 연결되었을 때 해당 이벤트를 전송할 수 있음

👉 수신자별로 다중 연결과 동시성 처리를 지원하고, 식별자 기반의 검색과 제거를 효율적으로 수행하기 위한 목적


✔️ 알림 Service

  • NotifyService
    @Service
    public class NotifyService {
        private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
        // SSE 연결 지속 시간 설정private final EmitterRepository emitterRepository;
        private final NotifyRepository notifyRepository;public NotifyService(EmitterRepository emitterRepository, NotifyRepository notifyRepository) {
            this.emitterRepository = emitterRepository;
            this.notifyRepository = notifyRepository;
        }// [1] subscribe()
        public SseEmitter subscribe(String username, String lastEventId) { // (1-1)
            String emitterId = makeTimeIncludeId(username); // (1-2)
            SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT)); // (1-3)
            // (1-4)
            emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
            emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));// (1-5) 503 에러를 방지하기 위한 더미 이벤트 전송
            String eventId = makeTimeIncludeId(username);
            sendNotification(emitter, eventId, emitterId, "EventStream Created. [userEmail=" + username + "]");// (1-6) 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
            if (hasLostData(lastEventId)) {
                sendLostData(lastEventId, username, emitterId, emitter);
            }return emitter; // (1-7)
        }private String makeTimeIncludeId(String email) { // (3)
            return email + "_" + System.currentTimeMillis();
        }private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) { // (4)
            try {
                emitter.send(SseEmitter.event()
                        .id(eventId)
                        .name("sse")
                        .data(data)
                );
            } catch (IOException exception) {
                emitterRepository.deleteById(emitterId);
            }
        }private boolean hasLostData(String lastEventId) { // (5)
            return !lastEventId.isEmpty();
        }private void sendLostData(String lastEventId, String userEmail, String emitterId, SseEmitter emitter) { // (6)
            Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(userEmail));
            eventCaches.entrySet().stream()
                    .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                    .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
        }// [2] send()
        //@Override
        public void send(Member receiver, Notify.NotificationType notificationType, String content, String url) {
            Notify notification = notifyRepository.save(createNotification(receiver, notificationType, content, url)); // (2-1)String receiverEmail = receiver.getEmail(); // (2-2)
            String eventId = receiverEmail + "_" + System.currentTimeMillis(); // (2-3)
            Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverEmail); // (2-4)
            emitters.forEach( // (2-5)
                    (key, emitter) -> {
                        emitterRepository.saveEventCache(key, notification);
                        sendNotification(emitter, eventId, key, NotifyDto.Response.createResponse(notification));
                    }
            );
        }private Notify createNotification(Member receiver, Notify.NotificationType notificationType, String content, String url) { // (7)
            return Notify.builder()
                    .receiver(receiver)
                    .notificationType(notificationType)
                    .content(content)
                    .url(url)
                    .isRead(false)
                    .build();
        }
    }

⭐ [1] subscribe() 메서드

  • 클라이언트가 알림을 구독하는 기능을 수행

    ✔️ 알림 구독
    ➜ Spring에서 제공하는 SseEmitter를 생성해서 이를 저장시켜둔 다음
    필요할 때 마다 해당 구독자가 생성한 SseEmitter를 불러와 이벤트에 대한 응답을 전송하는 것

  • Controller로 GET 요청이 오면 이 메서드가 호출되어 클라이언트에게 Server-Sent Events (SSE) 스트림을 반환

  • 클라이언트에게 더미 이벤트를 전송하여 연결이 생성되었음을 알림

  • 만약 클라이언트가 이전에 수신하지 못한 이벤트가 있는 경우, 해당 이벤트를 전송하여 유실을 예방

  • 생성된 SseEmitter를 반환하여 클라이언트와의 SSE 스트림 통신을 유지

    [ 💡 동작 과정 ]

    1-1. Controller에서 @AuthenticationPrincipal를 통해 가져온 사용자의 priincipal(수신자의 식별 정보)과 @RequestHeader로 가져온 "Last-Event-ID" 값(마지막 이벤트 식별자)을 파라미터로 받음

    1-2. emitterID 생성
    makeTimeIncludeId() 메서드로 username을 포함하여 SseEmitter를 식별하기 위한 고유 아이디 생성

    1-3. SseEmitter 객체 생성 및 저장
    emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));로 새로운 SseEmitter 객체를 생성하고, emitterId를 키로 사용해 emitterRepository에 저장
    ➜ 이렇게 생성된 SseEmitter는 클라이언트에게 이벤트를 전송하는 역할 수행

    1-4. onCompletion()onTimeout() 메서드를 사용하여
    ➜ SseEmitter가 완료되거나 타임아웃될 때 해당 SseEmitter를 emitterRepository에서 삭제하도록 설정

    1-5. 더미 이벤트 전송
    sendNotification() 메서드를 호출하여 클라이언트에게 연결이 생성되었음을 알리는 더미 이벤트 전송
    이를 통해 클라이언트-서버 간의 연결이 유지되도록 함

    1-6. 미수신한 이벤트 전송
    hasLostData(lastEventId) 메서드로 클라이언트가 미수신한 이벤트가 있는지 확인
    ➜ 만약 클라이언트가 마지막으로 수신한 이벤트 ID인 lastEventId가 존재한다면,
    이전에 발생한 이벤트 중에서 해당 이벤트 이후의 이벤트들을 캐시에서 가져와 클라이언트에게 전송
    ➜ 이를 통해 클라이언트가 놓친 이벤트를 보상하여 테이터 유실 예방

    1-7. 생성된 SseEmitter 객체를 반환하여 클라이언트에게 전달
    ➜ 클라이언트는 이를 통해 서버로부터 알림 이벤트를 수신 / 처리 가능

    ✔️ 더미 이벤트
    ➜ 클라이언트가 연결을 생성하고 SSE 스트림을 받을 때, 서버 측에서 초기 응답으로 보내는 이벤트

💡 emitterId, eventId 모두 같은 메서드에 같은 값을 넣어 만드는 Id 이므로 같은 값이지만,
무엇을 나타내는지 명확히 하기 위해 새로운 변수로 지정한 것 !

  • emitterId
    ➜ SseEmitter 를 구분 / 관리 하기 위한 식별자
    ➜emitterRepository에 저장되어 특정 클라이언트의 연결을 관리하는 데 사용됨
  • eventId
    ➜ 개별 알림 이벤트를 식별하기 위한 고유한 값
    ➜ 각 알림 이벤트는 고유한 eventId를 가지고 있고, 클라이언트에게 전송될 때 이벤트의 식별을 위해 사용됨

⭐ [2] send() 메서드

  • 알림을 생성하고 지정된 수신자에게 알림을 전송하는 기능 수행

  • 알림 수신자, 알림 유형, 내용, URL 등의 정보를 인자로 받음

  • 알림을 수신하는 모든 수신자에게 알림을 전송하기 위해 emitterRepository에서 해당 수신자의 모든 SseEmitter를 가져와 알림을 전송하고, 동시에 emitterRepository에 이벤트 캐시를 저장

    [ 💡 동작 과정 ]

    2-1. 알림 객체 생성 및 저장
    createNotification() 메서드로 send() 메서드의 매개변수로 받은 정보들을 포함한 알림 객체를 builder를 사용해 생성 후 notifyRepository에 저장
    Ex. 수신자, 알림 유형, 내용, url, 읽음 여부 가 저장됨

    2-2. 수신자의 이메일을 receiverEmail 변수에 저장
    ➜ 이 이메일은 SseEmitter 객체에서 관리되는 emitterRepository에서 사용됨

    2-3. 이벤트 ID 생성
    ➜ 이 아이디는 SseEmitter 로 전송되는 이벤트의 고유 식별자로 사용됨

    2-4. 수신자에 연결된 모든 SseEmitter 객체를 emitters 변수에 가져옴
    ➜ 수신자가 여러 클라이언트와 연결된 경우를 대비하여 다중 연결을 지원하기 위한 작업

    2-5. emitters를 순환하며 각 SseEmitter 객체에 알림 전송
    emitterRepository.saveEventCache(key, notification)를 호출하여 해당 SseEmitter 객체에 이벤트 캐시를 저장
    람다식 내부에서 (key, emitter) -> { ... }와 같은 형태로 매개변수를 정의하면, forEach() 메서드는 맵의 각 항목을 가져와서 해당 키를 key 변수에, 해당 값을 emitter 변수에 자동으로 할당

    sendNotification() 메서드를 호출하여 알림을 SseEmitter 객체로 전송
    ( 이 메서드는 알림을 SseEmitter 객체로 전송하는 역할 수행 )

    위 코드에서 data로 전해주는 NotifyDto.Response.createResponse(notification)
    알림 객체(notification)를 기반으로 알림 데이터를 적절히 가공하여 DTO로 생성하는 역할
    ➜ 이렇게 생성된 DTO 객체는 sendNotification 메서드에 전달되어 SSE 연결(emitter)을 통해 클라이언트에게 알림 데이터를 전송

💡 subscribe()send() 의 차이

  • subscribe() 메서드는 클라이언트와의 SSE 스트림 통신을 유지하면서 연결을 생성하고 유지
  • send() 메서드는 알림을 생성하고 해당 알림을 수신하는 모든 클라이언트에게 전송

    👉 subscribe() 메서드는 클라이언트의 요청에 응답하여 SSE 스트림을 제공하고, send() 메서드는 서버에서 알림을 생성하여 클라이언트에게 전송하는 역할

(3) makeTimeIncludeId()

  • EmitterId, eventId를 생성하는 부분

    💡 왜 구분자 뒤에 시각이 있을까?
    ➜ Id 값만 사용한다면 데이터가 언제 보내졌는지, 유실되었는지 알 수 없음.
    따라서 System.currentTimeMillis()를 붙여두면 데이터가 유실된 시점을 파악할 수 있으므로 저장된 key값 비교를 통해 유실된 데이터만 재전송 할 수 있게 된다.

    Ex. ID가 2546인 회원의 이벤트들 중에 가장 마지막으로 발생한 이벤트를 뒤에 붙은 시각으로 구분 !

  • subscribe() 메서드의 (1-2), (1-5) 번에서 사용됨


(4) sendNotification()

  • SseEmitter 객체를 사용하여 Server-Sent-Events (SSE) 를 클라이언트에게 전송하는 역할

    [ 💡 동작 과정 ]

    4-1. 더미 이벤트를 전송하는 부분

    4-2. 만약 클라이언트와 연결이 끊어져 해당 예외를 캐치하면, 연결이 끊긴 SseEmitter 객체를 제거하여 정리
    ( 연결이 끊어진 클라이언트와의 연결 정보를 삭제하고 메모리를 해제하기 위한 작업 )


(5) hasLostData()

  • lastEventId가 비어있지 않은지 확인하여 클라이언트가 이전 이벤트 이후에 새로운 이벤트를 놓치지 않았는지 확인

    • lastEventId가 비어있지 않다 == Controller의 헤더를 통해 lastEventId가 들어왔다 == 손실된 이벤트가 있다 == true 리턴
      ➜ 즉, 클라이언트가 이전 이벤트 이후에 새로운 이벤트를 받지 않았으므로, 이후에 발생한 이벤트들이 손실되었다는 의미
    • lastEventid가 비어있다 == Controller의 헤더를 통해 lastEventId가 들어오지 않았다 == 손실된 이벤트가 없다 == false 리턴
      ➜ 즉, 클라이언트가 이전에 받은 이벤트 이후에 새로운 이벤트들을 놓치지 않았다는 의미
  • subscribe() 메서드의 (1-6) 번에서 사용됨


(6) sendLostData()

  • 수신자에게 전송되지 못한 이벤트 데이터를 캐시에서 가져와 클라이언트에게 전송하는 과정을 수행

  • 파라미터로 클라이언트가 마지막으로 수신한 이벤트 Id, 수신자 이메일, SseEmitter 객체의 식별자, SseEmitter 객체 를 받음

  • subscribe() 메서드의 (1-6) 번에서 사용됨

    [ 💡 동작 과정 ]

    6-1. 수신자의 이메일을 기준으로 캐시된 이벤트 데이터를 가져옴
    ( 이벤트 캐시는 수신자에게 전송되지 못한 이벤트를 임시로 저장하는 용도로 사용됨 )

    6-2. eventCaches 맵 객체의 엔트리들을 스트림으로 변환해 순환하면서
    ( 엔트리는 키와 값의 쌍으로 구성 )

    6-3. lastEventId가 엔트리의 키보다 작을 때만 필터링
    클라이언트가 수신하지 못한 이벤트 데이터를 필터링하여
    ( 클라이언트가 마지막으로 수신한 이벤트 이후에 전송되지 못한 이벤트 데이터가 있냐 )

    6-4. 필터링된 각 엔트리 대해 sendNotification() 메서드를 호출하여 SseEmitter를 통해 해당 데이터(알림)을 클라이언트한테 전송
    (필터링된 엔트리의 키(entry.getKey()), emitterId, 값(entry.getValue())sendNotification() 메서드의 인자로 전달)

    💡 (6-3)의 필터링 과정(lastEventId.compareTo(entry.getKey()) < 0)에서 < 0이 있는 이유
    lastEventId.compareTo(entry.getKey())lastEventIdentry.getKey()를 비교하여 순서를 나타내는 정수 값을 반환하는데, 무엇이 더 크냐에 따라 정수값의 부호가 달라진다.

    • 음수 값일 경우 : lastEventId < entry.getKey()
    • 0일 경우 : lastEventId == entry.getKey()
    • 양수 값일 경우 : lastEventId > entry.getKey()

      👉 여기서는 lastEventId가 엔트리 키보다 작아야 하는 경우이기 때문에 음수값을 필터링하기 위해 < 0 이 있어야 함

(7) createNotification()

  • 파라미터로 받은 값들로 알림 객체를 builder를 이용해 생성하는 부분

  • send() 메서드의 (2-1) 번에서 사용됨


✔️ 알림 Dto

public class NotifyDto {
    @AllArgsConstructor
    @Builder
    @NoArgsConstructor
    @Getter
    @Setter
    public static class Response {
        String id;
        String name;
        String content;
        String type;
        String createdAt;
        public static Response createResponse(Notify notify) {
            return Response.builder()
                    .content(notify.getContent())
                    .id(notify.getId().toString())
                    .name(notify.getReceiver().getName())
                    .createdAt(notify.getCreatedAt().toString())
                    .build();
        }
    }
}

➜ 알림 Service 코드의 send() 메서드에서 SSE를 클라이언트에게 전송할 때, 이벤트의 데이터로 전송할 Dto

✔️ 데이터 전송 시 DTO를 사용하는 이점

1. 필요한 데이터만 포함
➜ DTO를 통해 필요한 데이터만 전송함으로써 네트워크 트래픽을 줄일 수 있음

2. 데이터 변환
➜ DTO는 데이터를 필요한 형식으로 변환하여 전송 가능
Ex. 알림 객체(notification)에는 모든 필드가 포함되어 있지만, DTO로 데이터를 캡슐화하여 필요한 필드만을 선택하여 전송 가능

3. 유연한 확장성
➜ DTO는 데이터의 구조를 독립적으로 정의하므로, 데이터의 구조 변경에 따른 영향을 최소화 가능
즉, 클라이언트와 서버 간의 데이터 형식이 변경되어도 DTO의 구조만 수정하면 되므로 유지보수가 용이

👉 데이터 전송 과정에서 데이터의 일관성을 유지하고 데이터 효율성유연성을 확보하기 위해 사용


여기까지 Notify 기능을 완성했고,

이 기능의 실행 결과는 아래 포스팅 중 [Project] Spring AOP로 로그 / 알림 기능 구현하기 ! 에서 확인할 수 있다 !

📌 Spring AOP(Aspect Oriented Programming)에 대한 자세한 개념들은 아래 포스팅을 참고해주세요.

[참고] https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
[참고] https://gilssang97.tistory.com/69

8개의 댓글

comment-user-thumbnail
2023년 9월 20일

와 지금까지 본 SSE 글중에서 가장 깔끔하게 정리가 잘되어있네요 ㄷㄷ
좋은 글 감사합니다.
혹시 SSE 연결 요청은 어느 시점에 한지 알 수 있을까요 ??
로그인 시에만 한번 요청하셨나요 ?

1개의 답글
comment-user-thumbnail
2023년 9월 27일

Sse 궁금했는데 잘 읽었습니다.
궁금한게 있는데요 eventCache는 언제 삭제하나요? 계속 추가만 될거 같아서요.

1개의 답글
comment-user-thumbnail
2023년 10월 11일

안녕하세요.글 잘봤습니다.
궁금한게 있습니다. lastEventId를 헤더로 받아오고 싶은 경우, 서버를 중단하고 다시 돌리면, 그때 서버에서 찍히더라구요.
근데 문제는 save로 current캐시 저장한게 다 날라가서 캐시로 저장한 값을 찾지 못합니다.
제가 하고싶은건, 네트워크 문제상 알림을 받지 못했을 때, 캐시에 쌓인 알림을 알림을 다시 받고싶은데, 어떻게 테스트를 해봐야 할까요?
큰트롤러에서 lastEventId값이 찍히는 경우는 서버를 내렸다 다시 킬때밖에 없는것 같습니다.

1개의 답글
comment-user-thumbnail
2023년 10월 27일

안녕하세요 SSE 정리가 정말 잘되어있어서 도움이 많이 되었습니다
궁금한 점이 있습니다.
LastEventId를 클라이언트가 헤더에 넣어서 보낼 수 도 있다고 했는데
LastEventId는 클라이언트가 어떻게 얻을 수 있고 eventCache에 있는 key값 중 가장 큰 값이 맞을까요?

답글 달기
comment-user-thumbnail
2024년 8월 8일

정말 잘 봤습니다. 그리고 궁금한 게 있습니다.

  1. 클라이언트에서 SSE 재 연결 요청시 subscribe() 메소드를 호출하게 되는데, 현재 코드에선 Last-Event-ID 여부를 확인합니다. 이 말은 누락된 이벤트의 존재 여부에 대한 판단을 클라이언트가 한다는 뜻 같습니다. 왜냐하면 클라이언트가 누락된 이벤트가 있다고 판단해야 Last-Event-ID 헤더를 포함시킬테고, 없다고 판단하면 뺄 것이기 때문입니다. 그렇다면 클라이언트는 누락된 이벤트 유무를 어떻게 판단할 수 있는 건가요?
    아니면, 클라이언트는 SSE 재 요청시 무조건 Last-Event-ID를 포함해서 넣고 서버에서 누락 여부를 판단하는 게 맞지 않나요?

  2. 현재 SSE 객체와 EventCaChe 모두 별도의 DB를 사용하지 않고 HashMap으로 사용하고 있습니다. 만약 서버가 다운되거나 재부팅하는 경우, 이벤트 캐시가 사라질텐데 이 문제는 괜찮을까요?

답글 달기