[Network] SSE(Server Sent Event)란?

김강욱·2024년 5월 13일
0

Network

목록 보기
2/4
post-thumbnail

이번 포스팅에서는 SSE 통신에 대해 알아보도록 하겠습니다.

카카오톡 같은 채팅이나 토스트 메시지를 통한 알림은 실시간으로 데이터를 전달합니다.

이러한 실시간 알림을 구현하기 위해서는 클라이언트와 서버가 지속적으로 연결되어 있거나 연결된 것처럼 보여야합니다.

이를 구현한 방식으로는 폴링(Poilling), 롱폴링(Long Polling), 웹소켓(WebSocket), SSE(Server Sent Event)가 존재합니다.

우선 SSE에 대해 알아보고 이후 다른 방식들에 대해 간단히 소개드리도록 하겠습니다.

✈️ SSE(Server Sent Event)

SSE(Server Sent Event)는 이벤트가 서버에서 클라이언트 방향으로만 흐르는 단방향 통신 채널을 의미합니다.

SSE(Server Sent Event)는 클라이언트가 주기적으로 HTTP 요청을 보낼 필요 없이 한 번의 HTTP 연결을 통해 서버에서 클라이언트로 데이터를 보낼 수 있습니다.

즉, 한 번 연결된 HTTP 통신을 쭉 이어간다고 볼 수 있습니다. 이러한 경로를 통해 서버에서 클라이언트로 이벤트를 전달할 수 있는 것입니다.

HTTP 프로토콜만으로 사용할 수 있기 때문에 구현이 용이하다는 장점이 있습니다.

만약 접속에 문제가 생길 시 자동으로 재연결을 시도하게 됩니다. 하지만 클라이언트가 연결을 닫게 됐을 시에 서버에서 감지하기가 어렵습니다.

서버가 클라이언트에게 데이터를 전송할 때 발생하는 예외를 통해 연결 상태를 확인할 수 있습니다.

예를 들어, 서버에서 클라이언트로 데이터를 전송하려고 할 때 IOExeption이 발생한다면 서버에서는 클라이언트의 연결이 종료되었음을 인지할 수 있습니다.

👌 SSE는 언제 사용할까?

SSE의 특성을 활용하여 클라이언트는 서버로 데이터를 전송할 필요가 없고, 어떤 이벤트가 발생했을 시 실시간으로 이벤트를 전달해야하는 경우에 사용됩니다.

예를 들어, 사용자가 게시글이나 댓글을 작성했을 시 해당 이벤트를 접속 중인 다른 사용자에게 알려주는 용도로 사용하는 등 실시간으로 알림을 띄울 때 유용하게 활용할 수 있습니다.

👌 SSE의 단점?

앞서 서버와 클라이언트를 지속적으로 연결되거나 연결된 것처럼 보이게 구현하는 방법의 종류에 대해 설명드렸습니다.

연결된 것 처럼 보이게 하는 구현 방법이 폴링, 롱폴링 방식인데 해당 방식들에 반해 실제로 클라이언트와 서버간의 연결을 끊지 않고 계속 유지하는 방식인 웹소켓과 SSE는 지속적인 연결을 유지해야 하기 때문에 서버의 리소스가 부담이 되는 것이 단점입니다.

👌 SSE의 특징?

  1. websocket과 달리 별도의 프로토콜을 사용하지 않고 HTTP 프로토콜만으로 사용이 가능하며 훨씬 가볍습니다.

  2. 접속에 문제가 있으면 자동으로 재연결을 시도합니다.

  3. 클라이언트에서 페이지를 닫아도 서버에서 감지하기가 어렵습니다.

  4. HTTP/1.1 프로토콜 사용시 브라우저에서 1개 도메인에 대해 생성할 수 있는 EventSteam의 최대 개수는 6개로 제한됩니다. (HTTP/2 프로토콜 사용시에는 브라우저와 서버간의 조율로 최대 100개까지 유지가 가능합니다.)

  5. 이벤트 데이터는 UTF-8 인코딩된 문자열만 지원합니다. 서버 사이드에서 이벤트 데이터를 담은 객체를 JSON으로 마샬링하여 전송하는 것이 가장 일반적이고 무난합니다.

  6. 현재 Internet Explorer을 제외한 모든 브라우저에서 지원합니다. JavaScript에서는 EventSource를 이용하여 연결 생성 및 전송된 이벤트에 대한 제어가 가능합니다.

  7. Spring Framework는 4.2(2015년)부터 SseEmitter 클래스를 제공하여 서버 사이드에서의 SSE 통신 구현이 가능합니다.

✈️ SSE(Server Sent Event) 구현해보기

SSE를 구현하기에 앞서 SSE의 통신 흐름을 한번 살펴보겠습니다.

✔️ SSE 통신과정

  1. 클라이언트에서 SSE 연결 요청을 전송하게 됩니다.

  2. 서버에서 클라이언트와 매핑되는 SSE 통신 객체를 생성합니다.

  3. 서버에서 SSE 연결 성공 메시지를 클라이언트에게 전송하게 됩니다.

  4. 서버에서 이벤트 발생 시 해당 SSE 객체를 통해 클라이언트로 데이터를 전달하게 됩니다.


✔️ SSE Container

클라이언트와의 SSE 접속이 완료된 서버 측에서는 현재 브라우저와의 SSE 연결 상태를 저장하게 됩니다.

해당 SSE 연결 상태를 저장하고 있는 곳이 SSE Container이며 SSE Container에서 SSE 통신 연결을 성공한 SseEmitter 객체들을 식별자를 Key 값으로 저장하게 됩니다.

SseEmitter 객체는 실제 서버가 전달해야할 클라이언트에게 이벤트를 전달하기 위해 사용되는 객체입니다. 서버에서 이벤트가 발생했을 때 전달해야할 클라이언트에 해당되는 SseEmitter 객체를 통해 실제 이벤트를 전달하게 됩니다.


✔️ EventStream의 데이터 형태

SSE 연결을 통해 도착하는 데이터의 형태는 아래와 같습니다.

// 두 줄 이상의 연속된 줄은 하나의 데이터 조각으로 간주됨. 
// 마지막 행을 제외한 줄은 \n(마지막 행은 \n\n)

// 1
data: first line\n\n

// 2
data: first line\n
data: second line\n\n
// 고유 id 같이 보내기. 
// id 설정 시 브라우저가 마지막 이벤트를 추적하여 서버 연결이 끊어지면 
// 특수한 HTTP 헤더(Last-Event-ID)가 새 요청으로 설정됨. 
// 브라우저가 어떤 이벤트를 실행하기에 적합한 지 스스로 결정할 수 있게 됨.

id: 12345\n
data: first line\n
data: second line\n\n
// JSON 형식 예시

data: {\n
data: "msg": "hello world",\n
data: "id": 12345\n
data: }\n\n
// event 이름 설정

id: 12345\n
event: sse\n
data: {"msg": "hello world", "id": 12345}\n\n

✔️ 실제 구현하기

1. 클라이언트 구현(React)

let initialDelay = 1000; // 1초
let maxDelay = 120000; // 120초

function initializeEventSource(setEventSource, addToast, timerIds) {
  const memberId = getCookie("memberId");
  const newEventSource = new EventSource(
    `http://13.209.219.235:8080/api/v1/subscribe/${memberId}`
    // `http://localhost:8080/api/v1/subscribe/${memberId}`
  );

  newEventSource.addEventListener("open", () => {
    console.log("SSE 연결 성공");
    initialDelay = 1000;
  });

  // 정상적인 알람 메시지 도착
  newEventSource.addEventListener("sse", (event) => {
    console.log(event.data);

    let data = JSON.parse(event.data);

    if (!data.isCountMessage) {
      data = data.notificationResponse;
    }

    const currentTimerId = addToast(data);
    timerIds.push(currentTimerId);
  });

  // 서버에서 구독, 전송 오류 전달
  newEventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === "error") {
      if (data.message === "SSE 구독 오류 발생") {
        console.log("구독 오류");
        reconnectWithBackoff(initialDelay, setEventSource, addToast, timerIds);
      } else if (data.message === "SSE 전송 오류 발생") {
        console.log("전송 오류");
        alert("알림 전송 오류가 발생했습니다. 새로고침을 눌러주세요.");
      }
    }
  };

  // 네트워크 문제나 서버의 연결 종료 등으로 인해 연결이 끊어진 경우(서버의 500 응답 또는 무응답)
  newEventSource.addEventListener("error", function (event) {
    console.log("eventSource 에러 발생");
    console.log(event);
    if (event.target.readyState === EventSource.CLOSED) {
      console.log("SSE closed");
      reconnectWithBackoff(initialDelay, setEventSource, addToast, timerIds);
    } else if (event.target.readyState === EventSource.CONNECTING) {
      console.log("SSE reconnecting");
    } else {
      console.error("SSE error:", event);
    }
  });

  setEventSource(newEventSource);
}

function reconnectWithBackoff(delay, setEventSource, addToast, timerIds) {
  setTimeout(() => {
    console.log(`재연결 시도. 대기 시간: ${delay}ms`);
    initializeEventSource(setEventSource, addToast, timerIds);
    if (delay < maxDelay) {
      initialDelay *= 2;
    }
  }, delay);
}

export function useIsLoggedIn(
  isLoggedIn,
  eventSource,
  setEventSource,
  addToast
) {
  useEffect(() => {
    const timerIds = [];

    if (isLoggedIn && !eventSource) {
      console.log("eventSource");
      initializeEventSource(setEventSource, addToast, timerIds);
    } else if (!isLoggedIn && eventSource) {
      eventSource.close();
      setEventSource(null);
    }

    return () => {
      if (eventSource) {
        eventSource.close();
      }
      timerIds.forEach((id) => clearTimeout(id));
    };
  }, [isLoggedIn, eventSource]);
}

먼저 SSE 연결을 위해 서버에 연결을 요청할 url 정보를 담은 EventSource 객체를 만들어줍니다.

위의 코드에서는 3개의 이벤트 리스너 설정과 1개의 이벤트 핸들러를 설정하고 있습니다.

1. onmessage

onmessage 이벤트 핸들러는 서버로부터 일반적인 메시지 이벤트를 수신할 때 호출됩니다.

해당 핸들러는 서버가 type 필드를 포함한 JSON 메시지를 보내고, 이 타입이 error일 때 오류 처리 로직을 실행하도록 합니다. reconnectWithBackoff(initialDelay, setEventSource, addToast, timerIds); 메서드를 통해 점점 주기를 늘리면서 재연결 요청을 하도록 구현하였습니다.

2. addEventListener("open")

addEventListener("open") 이벤트 리스너는 EventSource 객체가 성공적으로 서버와의 연결을 맺었을 때 발생합니다.

연결이 성공적으로 시작되었다는 것을 로그로 기록하고, 연결에 사용될 초기 지연시간(initialDelay)을 설정합니다.

3. addEventListener("sse")

addEventListener("sse") 이벤트 리스너는 이벤트 타입이 sse인 경우에 동작합니다.

addEventListener("sse")에서 사용된 이벤트 타입 sse는 서버 사이드에서 SseEmitter를 사용하여 보낸 이벤트의 이름과 동일합니다.

sseEmitter.send(SseEmitter.event()
                    .name("sse")  // 이벤트 이름(타입)
                    .data(data));

4. addEventListener("error")

addEventListener("error") 이벤트 리스너는 연결에 문제가 생겼을 때 호출됩니다.

네트워크 문제나 서버에서 연결을 종료하는 경우 등에 해당 리스너가 반응합니다. 연결 상태에 따라 다시 연결을 시도하거나 에러를 로그로 기록하는 등의 처리를 수행합니다.



2. 서버 구현(Spring)

⚙️ 컨트롤러 구현

서버에서는 클라이언트의 EventSource를 통해 날라오는 요청을 처리할 컨트롤러가 필요합니다.

SSE 통신을 하기 위해서는 MIME 타입을 text/event-stream으로 해주셔야 합니다.

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
@Tag(name = "notification", description = "알림 API")
public class NotificationController {

    private final WebNotificationServiceImpl notificationService;
    private final NotificationMapper mapper;


    /**
     * 구독하기 (+ 안읽은 메시지 갯수 알림 보내주기)
     */
    // TODO: 토큰값을 이용하여 memberId를 사용하도록 수정
    @GetMapping(value = "/subscribe/{id}", produces = "text/event-stream")
    @Operation(summary = "SSE 연결 구독하기", description = "클라이언트와 서버 간의 SSE 연결을 하고 연결 성공 시 사용자가 안읽은 알림에 대한 갯수를 보내줍니다.", tags = "notification")
    @ApiResponses(value = {
            @ApiResponse(code = 200, message = "successfully retrieved", response = SseEmitter.class),
            @ApiResponse(code = 404, message = "user not founded", response = ErrorResponse.class),
            @ApiResponse(code = 200, message = "sse Connection is failed", response = String.class)
    })
    @Transactional
    public SseEmitter subscribe(@PathVariable("id") Long memberId) {
        return notificationService.subscribe(memberId);
    }
 ...
 }

/api/v1/subscribe/{id} url로 요청이 올 시에 SSE 연결을 하도록 구현하였습니다.


⚙️ 서비스 구현


@RequiredArgsConstructor
@Service
public class WebNotificationServiceImpl implements NotificationService{

    private final MapEmitterRepository emitterRepository;
    private final MybatisNotificationRepository notificationRepository;

    private final MemberRepository memberRepository;


    private final NotificationMapper mapper;

    /**
     * 서버에서 클라이언트로 data 전송
     */
//    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void sendToClient(SseEmitter sseEmitter, Object data) {

        try {
            sseEmitter.send(SseEmitter.event()
                    .name("sse")
                    .data(data));
        } catch (IOException e) {
            System.out.println("SSE 전송 오류 발생");
            throw new ApplicationException(ErrorCode.CONNECTION_ERROR,"SSE 전송 오류 발생");
        }
    }


    /**
     * pk에 해당하는 사용자와 서버 간의 구독
     */
    public SseEmitter subscribe(Long memberId) {

        if (memberRepository.findById(memberId).isEmpty()) {
            throw new ApplicationException(ErrorCode.USER_NOT_FOUND);
        }

        SseEmitter emitter;
        Integer count;

        try {
            emitter = emitterRepository.save(memberId, new SseEmitter(DEFAULT_TIMEOUT));

            emitter.onCompletion(() -> emitterRepository.deleteById(memberId));
            emitter.onTimeout(() -> emitterRepository.deleteById(memberId));
            emitter.onError((e) -> emitterRepository.deleteById(memberId));

            count = mapper.getNotificationCountIsReadFalse(memberId);
        } catch (RuntimeException e) {
            System.out.println("SSE 구독 오류 발생");
            throw new ApplicationException(ErrorCode.CONNECTION_ERROR, "SSE 구독 오류 발생");
        }


        sendToClient(emitter,new NotificationData("안읽으신 "+count+"개의 메시지가 있습니다.",FRONT_SERVER_HOST+"/notification"));
        notificationRepository.updateIsTransmittedbyMemberId(true,memberId);



        return emitter;
    }


    /**
     * 사용자 pk에 해당하는 SseEmitter를 찾아서 data를 전송한다.
     */
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void send(Long memberId, Object data) {
        SseEmitter emitter = emitterRepository.findById(memberId);
        if (emitter==null) {
            throw new NoEmitterException();
        }
        sendToClient(emitter,data);

    }
 ...
 }

subscribe 메서드

위의 컨트롤러에 SSE 연결 요청이 왔을 시 호출되는 메서드로 사용자 pk 값을 파라미터로 받아 SseEmitter 객체를 만들어주는 역할을 합니다.

코드를 살펴보시면 먼저 파라미터로 들어온 사용자 pk에 대한 유효성 검증을 하고 SSE Container에 사용자 pk와 새로 생성한 SseEmitter 객체를
저장하게 됩니다.

클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해줘야합니다. SseEmitter 객체를 만들 때 유효 시간(DEFAULT_TIMEOUT)을 줄 수 있습니다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 됩니다.

emitter.onCompletion(() -> emitterRepository.deleteById(memberId));
            emitter.onTimeout(() -> emitterRepository.deleteById(memberId));
            emitter.onError((e) -> emitterRepository.deleteById(memberId));

위의 코드는 SSE 연결이 완료되거나 시간 초과, 에러로 인해 연결이 끊기는 경우 서버에서 해당 브라우저에 대한 연결상태를 저장하고 있는 SseEmitter 객체를 삭제해주는 작업입니다.

연결 요청에 의해 SseEmitter가 생성되면 더미 데이터를 보내줘야합니다. sse 연결이 이뤄진 후, 하나의 데이터도 전송되지 않는다면 SseEmitter의 유효 시간이 끝나면 503응답이 발생하는 문제가 있다. 따라서 연결시 바로 더미 데이터를 한 번 보내줍니다. sendToClient(emitter,new NotificationData("안읽으신 "+count+"개의 메시지가 있습니다.",FRONT_SERVER_HOST+"/notification")); 이 부분에 해당됩니다.

sendToClient 메서드

해당 메서드는 SseEmitter 객체와 데이터 객체를 파라미터로 받습니다. 파라미터로 받은 SseEmitter 객체의 send 메서드를 호출하여 이벤트를 전송하게 됩니다.

이벤트 이름을 sse로 보내고 데이터를 보낼시 아래와 같이 데이터가 전달됩니다.

event: sse\n
data: {"msg": "hello world", "id": 12345}\n\n

send 메서드

해당 메서드는 사용자 pk와 전송할 데이터를 파라미터로 받아서 sendToClient 메서드를 호출하여 데이터를 전송하게 해주는 역할을 합니다.

사용자 pk를 기반으로 연결을 담당하고 있는 SseEmitter 객체를 찾아서 sendToClient의 파라미터로 넣어줍니다.


⚙️ SSE Container 작성

@Repository
public class MapEmitterRepository implements EmitterRepository{

    private static ConcurrentHashMap<Long, SseEmitter> map = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(Long memberId, SseEmitter sseEmitter) {
        map.put(memberId, sseEmitter);
        return sseEmitter;
    }

    @Override
    public void deleteById(Long memberId) {
        map.remove(memberId);
    }

    @Override
    public SseEmitter findById(Long memberId) {
        return map.get(memberId);
    }
}

해당 코드는 사용자 pk를 key값으로, 해당 사용자와의 연결 객체인 SseEmitter를 ConcurrentHashMap에 저장하는 SSE Container의 코드입니다.

Multi-Thread 환경에서 Thread-safe 하도록 ConcurrentHashMap 자료 구조를 사용하여 구현해보았습니다.

해당 컨테이너는 save, deleteById, findById 메서드를 지원하며 각각의 메서드는 사용자 pk-사용자 연결 객체(SseEmitter)를 key-value 구조저장, 사용자 pk를 이용하여 사용자 연결 객체(SseEmitter) 삭제, 사용자 pk를 이용하여 사용자 연결 객체(SseEmitter) 객체 찾기 역할을 하고 있습니다.

✈️ 알림 구현을 위한 다양한 방식 비교

위에서 언급했던 폴링(Polling), 롱폴링(Long Polling), 웹소켓(Web Socket)에 대해 간단하게 알아보겠습니다.

1. 폴링(Polling)

폴링 방식은 클라이언트가 주기적으로 서버에게 요청을 보내는 방식을 의미합니다.

일정 시간마다 클라이언트가 서버로 요청을 보내 데이터 갱신이 있는지 확인하고, 갱신이 되면 응답을 받는 방식입니다.

구현이 단순하지만 계속 요청을 보내기 때문에 요청에 대한 리소스 낭비가 발생할 수 있습니다.

구현이 단순하다는 장점이 있기 때문에, 요청하는데 부담이 크지 않고 요청 주기를 넉넉하게 잡아도 될 정도로 실시간성이 중요하지 않거나, 데이터 갱신이 특정 주기를 갖는 상황에서 많이 쓰입니다.

사진 출처 : https://gilssang97.tistory.com/69


2. 롱폴링(Long Polling)

롱폴링 방식은 폴링 방식과 비슷하며 유지 시간을 길게 갖는 게 특징입니다.

즉, 요청을 보내고 서버에서 변경이 일어날 때까지 대기하는 방식입니다.

처음에 긴 커넥션을 갖게 요청을 보내고, 이 지속 시간동안 이벤트가 발생하면 그 이벤트에 대한 값을 유지되고 있는 커넥션을 통해서 전달하게 됩니다. 커넥션이 연결되는 동안은 이벤트 발생을 실시간으로 감지할 수 있습니다.

지속적으로 요청을 보내지 않기 때문에 폴링 방식보다 부담이 덜하지만 유지 시간이 짧아지면 폴링 방식과 차이점이 없게 되며, 지속적으로 연결이 되어 있기 때문에 다수의 클라이언트에게 동시에 이벤트가 발생하면 순간적으로 부담이 급증하게 됩니다.

즉, 이벤트가 자주 발생하면 커넥션을 끊고 다시 연결해줘야하기 때문에 부담이 커지게 됩니다.

실시간 전달이 중요한데 상태가 빈번하게 갱신되지 않는 상황에 사용됩니다.

사진 출처 : https://gilssang97.tistory.com/69


3. 웹소켓(WebSocket)

웹소켓은 HTTP와 같은 프로토콜의 일종으로 양방향 통신을 실현하기 위한 구조입니다.

최초 접속은 일반 HTTP 요청을 이용한 handshaking으로 이루어지고, HTTP와 같이 연결 후 끊어버리는 것이 아니라 계속적으로 커넥션을 지속하므로 연결에 드는 불필요한 비용을 제거하게 됩니다.

웹소켓을 활용하면 용량이 큰 Http Header를 최초 접속시에만 보내고 더이상 보내지 않으므로 리소스 면에서 이득을 볼 수 있습니다.

웹소켓 포트에 접속해있는 모든 클라이언트에게 이벤트 방식으로 응답할 수 있습니다.

클라이언트와의 커넥션을 끊지 않고 계속 유지하기 때문에 서버 쪽에 리소스 부담이 크다는 단점이 있습니다.

클라이언트와 서버 간의 양방향 통신이 가능하고 실시간성이 중요한 채팅 기능 등에 자주 사용됩니다.

웹소켓 자료

사진 출처 : https://gilssang97.tistory.com/69


참고 자료
Junseo Kim님의 블로그
지단로보트님의 블로그
gilssang97님의 블로그

profile
TO BE DEVELOPER

0개의 댓글

관련 채용 정보