[SSE] 지속적인 피드백이 필요해

eunniverse·2024년 5월 24일
0

글 쓰게된 계기

친구들과 함께하는 개인 프로젝트를 진행하던 중, 발생한 상황이 있다.
그건 바로 요청에 따라 지속적인 응답을 받아야 하는 상황이었다.
여러 방법을 검토하던 중 SSE를 적용하기로 했고, 블로그에 적어야겠다! 생각했다.

SSE?

SSE는 서버의 데이터를 실시간으로 스트리밍하는 기술이다. 서버의 변경된 데이터를 가져오기 위해서 요청을 보내거나 페이지를 새로고침 하는 등 클라이언트의 동작이 있어야만 했다. 물론 Polling, Websocket 등을 사용하는 방법도 있지만 이는 별도의 서버와 프로토콜로 통신하므로 비용이 많이 발생한다. 그래서 실시간 알림 등 서버에서 주체적으로 응답을 받아와야할 때는 SSE를 사용하면 BackEnd, FrontEnd 모두 쉽게 개발이 가능하고 , HTTP API만으로 구현할 수 있다.

SSE는 어디에 쓰일까?

SSE는 결국 서버의 단방향 통신이다. 따라서 주로 데이터를 받기만 하면 되고, 반드시 실시간이지 않아도되는 알림 등에서 사용된다.

양방향 통신이 필요할 경우?

주로 WebSocket 을 사용하고, 오디오, 영상 등의 데이터를 교환할 때는 WebRTC를 사용한다. 그럼 WebSocket 과 WebRTC 를 간단하게 알아보자.

WebSocket

  • 서버와 클라이언트가 실시간으로 양방향 통신할 수 있는 socket 기술
  • 구버전 브라우저 제외 모든 브라우저에서 지원
  • 서버를 중심으로 request 와 response로 데이터 교환 발생
    -> 메모리 문제, 전달 속도 등 비용 문제 발생
  • 위와 같은 문제를 해결하기 위해 떠오른 기술 => WebRTC

WebRTC

  • 브라우저끼리 통신하여 P2P 형태로 오디오, 영상 등 교환하는 기술
  • WebSocket 에 비해 장점은 다음과 같다.
    -> 영상, 오디오, 데이터 통신이 고성능, 고품질이도록 설계됨
    -> 브라우저간 직접 통신이므로 전송 속도가 빠름
    -> 네트워크 지연시간이 짧음
  • 신호를 주고받을 수 있는 signaling 서버가 필요 -> WebSocket 혹은 Socket.io를 사용해 구현
  • 동시에 많은 유저가 접근하는 경우 과부하가 급격히 증가함

SSE 간단한 예시

server 구현 예시

NotificationController

import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * 알림 controller (with SSE)
 * @author eunivverse
 */
@RestController
@RequestMapping("/notifications")
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;

    /**
     * SSE 구독
     * @param id
     * @return
     */
    // SSE 이벤트를 구독하는 Controller 는 MediaType 을  `TEXT_EVENT_STREAM_VALUE` 로 해야한다
    @GetMapping(value = "/subscribe/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@PathVariable Long id) throws Exception {
        return notificationService.subscribe(id);
    }

    /**
     * SSE 데이터 전송
     * @param id
     */
    @PostMapping("/send-data/{id}")
    public void sendData(@PathVariable Long id) {
        notificationService.notify(id, "data");
    }
}

NotificationService

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {

    // SSE 이벤트 연결 서버 타임아웃 시간
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
    private final NotificationRepository notificationRepository;

    /**
     * 클라이언트가 구독을 위해 호출
     *
     * @param userId - 클라이언트의 사용자 아이디.
     * @return SseEmitter - 서버에서 보낸 이벤트 Emitter
     */
    public SseEmitter subscribe(Long userId) throws Exception {
        try {
            // 사용자 아이디를 기반으로 이벤트 Emitter 생성
            SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);

            // DB에 Emitter 저장
            notificationRepository.save(userId, emitter);

            // Emitter가 모든 데이터가 성공적으로 전송되었을 때 Emitter 삭제
            emitter.onCompletion(() -> notificationRepository.deleteById(userId));

            // Emitter가 지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때 Emitter 삭제
            emitter.onTimeout(() -> notificationRepository.deleteById(userId));

            notify(userId, "EventStream Created. [userId=" + userId + "]");

            return emitter;

        } catch (Exception e) {
            log.error("SSE subscription failed. exception = {}", e);
            throw e;
        }
    }

    /**
     * 서버의 이벤트를 클라이언트에게 보내는 메서드
     * 다른 서비스 로직에서 이 메서드를 사용해 데이터를 Object event 에 넣고 전송
     *
     * @param userId - 메세지를 전송할 사용자의 아이디.
     * @param event  - 전송할 이벤트 객체.
     */
    public void notify(Long userId, Object event) {
        SseEmitter emitter = notificationRepository.get(userId);

        if (emitter != null) {
            try {
                // 클라이언트에게 데이터를 전송
                emitter.send(SseEmitter.event().id(String.valueOf(userId)).name("event").data(event));

            } catch (Exception e) {
                // SSE 이벤트 전송 실패 시 로그 입력
                log.error("SSE notify failed. exception = {}", e);

                //  DB에서 데이터 삭제
                notificationRepository.deleteById(userId);

                //  emitter 에 error 전송
                emitter.completeWithError(e);
            }
        }
    }
}

NotificationRepository

import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Repository
@RequiredArgsConstructor
public class NotificationRepository {
    // 모든 Emitters를 저장하는 ConcurrentHashMap
    private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();

    /**
     * 주어진 아이디와 이미터를 저장
     *
     * @param id      - 사용자 아이디.
     * @param emitter - 이벤트 Emitter.
     */
    public void save(Long id, SseEmitter emitter) {
        emitters.put(id, emitter);
    }

    /**
     * 주어진 아이디의 Emitter를 제거
     *
     * @param id - 사용자 아이디.
     */
    public void deleteById(Long id) {
        emitters.remove(id);
    }

    /**
     * 주어진 아이디의 Emitter를 가져옴.
     *
     * @param id - 사용자 아이디.
     * @return SseEmitter - 이벤트 Emitter.
     */
    public SseEmitter get(Long id) {
        return emitters.get(id);
    }
}

client 구현 예시

// eventSource 생성
const eventSource = new EventSource('http://localhost:8080/notifications/subscribe/1');

// event 받아오기
eventSource.addEventListener('event', event => {
    console.log(event);
    // 이후 event 기반 알림 toast 보이도록 로직 구현 ...
});

마무리

SSE는 구현도 굉장히 간편하고, 쓰일 수 있는 곳도 정말 많은 것 같다. 다음에 구현할 일이 있으면 node.js 로도 SSE를 구현해봐야지!
profile
능력이 없는 것을 두려워 말고, 끈기 없는 것을 두려워하라

0개의 댓글