[Spring + SSE] Server-Sent Events를 이용한 실시간 알림

Junseo Kim·2021년 9월 16일
49

Spring

목록 보기
6/7

코드리뷰 매칭 플랫폼 개발 중 알림 기능이 필요했다.

리뷰어 입장에서는

  • 새로운 리뷰 요청이 생겼을 때
  • 모든 리뷰가 끝나고 리뷰이의 피드백이 도착했을 때

리뷰이 입장에서는

  • 리뷰 요청이 거절되었을 때
  • 리뷰 요청이 수락되었을 때
  • 리뷰어가 리뷰를 완료했을 때

공통적으로

  • 새로운 채팅이 있을 때

위의 경우 알림을 보내줘야한다고 생각했다. 로그인한 상태가 아니라면 로그인 했을 때 받은 알림을 모두 보여주면 되지만, 로그인한 상태라면 실시간으로 알림을 받길 원했다. 이 실시간 알림 기능을 구현하기 위해 SSE(Server Sent Events)를 이용했다.

📂 고려 기술

실시간 웹 애플리케이션 개발 시 사용되는 몇 가지 방법이 있다.

polling(client pull)

클라이언트가 일정한 주기로 서버에 업데이트 요청을 보내는 방법. 지속적인 HTTP 요청이 발생하기 때문에 리소스 낭비가 발생한다.

websocket(server push)

실시간 양방향 통신을 위한 스펙으로 서버와 브라우저가 지속적으로 연결된 TCP라인을 통해 실시간으로 데이터를 주고받을 수 있도록 하는 HTML5 사양이다. 연결지향 양방향 전이중 통신이 가능하며 채팅, 게임, 주식 차트 등에 사용된다. polling은 주기적으로 HTTP 요청을 수행하지만, websocket은 연결을 유지하여 서버와 클라이언트 간 양방향 통신이 가능하다.

SSE(server push)

이벤트가 [서버 -> 클라이언트] 방향으로만 흐르는 단방향 통신 채널이다. SSE는 클라이언트가 polling과 같이 주기적으로 http 요청을 보낼 필요없이 http 연결을 통해 서버에서 클라이언트로 데이터를 보낼 수 있다.

이 3가지 방법 중 SSE를 이용하여 실시간 알림 기능을 구현하기로 결정했다. polling은 지속적인 요청을 보내야하므로 리소스 낭비가 심할 것 같았고, 실시간 알림같은 경우는 서버에서 클라이언트 방향으로만 데이터를 보내면 되기 때문에 websocket처럼 양방향 통신은 필요없었다. 따라서 웹 소켓에 비해 가볍고 서버 -> 클라이언트 방향을 지원하는 SSE를 선택했다.

🔎 SSE 자세히 알아보기

SSE는 서버의 데이터를 실시간, 지속적으로 클라이언트에 보내는 기술이다. 위의 그림처럼 클라이언트에서 처음 HTTP 연결을 맺고 나면 서버는 클라이언트로 계속하여 데이터를 전송할 수 있다.

일반적으로 HTTP 요청은 하나의 [요청 - 응답] 과정을 거치고 연결을 종료한다. 하지만 파일 전송과 같이 연결 상태를 유지하고 계속 데이터를 보내는 경우도 있다. SSE는 이와 같이 한 번 연결 후 서버에서 클라이언트로 데이터를 계속해서 보낼 수 있다.

특징

  • websocket과 달리 별도의 프로토콜을 사용하지 않고 HTTP 프로토콜만으로 사용이 가능하며 훨씬 가볍다.
  • 접속에 문제가 있으면 자동으로 재연결을 시도한다.
  • 최대 동시 접속 수는 HTTP/1.1의 경우 브라우저 당 6개이며, HTTP/2는 100개까지 가능하다.
  • IE를 제외한 브라우저에서 지원된다.(Polyfills을 사용하면 가능하다고 한다.)
  • 이벤트 데이터는 UTF-8 인코딩된 문자열만 지원한다. 일반적으로 JSON으로 마샬링하여 전송한다.
  • 클라이언트에서 페이지를 닫아도 서버에서 감지하기가 어렵다.

Event Stream 형태

sse 연결을 통해 도착하는 데이터의 형태를 살펴보자

// 두 줄 이상의 연속된 줄은 하나의 데이터 조각으로 간주됨. 
// 마지막 행을 제외한 줄은 \n(마지막 행은 \n\n)

// 1
data: first line\n\n

// 2
data: first line\n
data: second line\n\n
// 고유 id 같이 보내기. 
// id 설정 시 브라우저가 마지막 이벤트를 추적하여 서버 연결이 끊어지면 
// 특수한 HTTP 헤더(Last-Event-ID)가 새 요청으로 설정됨. 
// 브라우저가 어떤 이벤트를 실행하기에 적합한 지 스스로 결정할 수 있게 됨.

id: 12345\n
data: first line\n
data: second line\n\n
// JSON 형식 예시

data: {\n
data: "msg": "hello world",\n
data: "id": 12345\n
data: }\n\n
// event 이름 설정

id: 12345\n
event: sse\n
data: {"msg": "hello world", "id": 12345}\n\n

🌱 SSE in Spring

spring에서 sse을 어떻게 적용하는지 알아보자. spring framework 4.2부터 SSE 통신을 지원하는 SseEmitter 클래스가 생겼다. spring framework 5부터 WebFlux를 이용해서도 sse 통신을 할 수 있지만, SseEmitter를 사용하여 구현해보려고 한다.

reference에도 언급했지만 서버단 구현은 https://jsonobject.tistory.com/558 이 블로그를 많이 참고했습니다!

클라이언트 구현

위에서 설명한대로 SSE 통신을 하기 위해서는 처음에는 클라이언트에서 서버로 연결이 필요하다. 클라이언트에서 서버로 sse 연결 요청을 보내기 위해서 자바스크립트는 EventSource를 제공한다.

간단하게 숫자를 입력하고 로그인 버튼을 누르면 해당 숫자(유저 id)로 sse 연결을 맺고 후에 서버에서 해당 유저와의 sse 연결을 통해 데이터가 날라오면 브라우저 알림을 띄우는 코드이다.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Notification Test Page</title>
</head>
<body>
    <input type="text" id="id"/>
    <button type="button" onclick="login()">로그인</button>
</body>
</html>
<script type="text/javaScript">
    function login() {
        const id = document.getElementById('id').value;

        const eventSource = new EventSource(`/subscribe/` + id);

        eventSource.addEventListener("sse", function (event) {
            console.log(event.data);

            const data = JSON.parse(event.data);

            (async () => {
                // 브라우저 알림
                const showNotification = () => {
                    
                    const notification = new Notification('코드 봐줘', {
                        body: data.content
                    });
                    
                    setTimeout(() => {
                        notification.close();
                    }, 10 * 1000);
                    
                    notification.addEventListener('click', () => {
                        window.open(data.url, '_blank');
                    });
                }

                // 브라우저 알림 허용 권한
                let granted = false;

                if (Notification.permission === 'granted') {
                    granted = true;
                } else if (Notification.permission !== 'denied') {
                    let permission = await Notification.requestPermission();
                    granted = permission === 'granted';
                }

                // 알림 보여주기
                if (granted) {
                    showNotification();
                }
            })();
        })
    }
</script>

서버 - 컨트롤러 구현

서버에서는 EventSource를 통해 날아오는 요청을 처리할 컨트롤러가 필요하다. sse 통신을 하기 위해서는 MIME 타입을 text/event-stream로 해줘야한다. 현재는 편의를 위해 /subscribe/{id}와 같이 직접 유저의 id를 받게 하였지만, 실제 적용시에는 access-token을 활용할 계획이다.

@RestController
public class NotificationController {

    private final NotificationService notificationService;

    public NotificationController(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    /**
     * @title 로그인 한 유저 sse 연결
     */
    @GetMapping(value = "/subscribe/{id}", produces = "text/event-stream")
    public SseEmitter subscribe(@PathVariable Long id,
                                @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
        return notificationService.subscribe(id, lastEventId);
    }
}

추가적으로 Last-Event-ID라는 헤더를 받고 있는 것을 볼 수 있다. 이 헤더는 항상 담겨있는 것은 아니다. 만약 sse 연결이 시간 만료 등의 이유로 끊어졌을 경우에 알림이 발생하면 어떻게 될까? 그 시간 동안 발생한 알림은 클라이언트에 도달하지 못할 것이다. 이를 방지하기 위한 것이 Last-Event-ID 헤더이다. 이 헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미한다. 이를 이용하여 유실된 데이터를 다시 보내줄 수 있다. 밑에서 자세히 설명할 예정이다.

서버 - 서비스(SSE 연결) 구현

유저의 id와, Last-Event-ID값이 아래의 subscribe()로 넘어온다. 먼저 코드를 보고 설명을 하겠다.

@Service
public class NotificationService {
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    private final EmitterRepository emitterRepository;

    public NotificationService(EmitterRepository emitterRepository) {
        this.emitterRepository = emitterRepository;
    }

    public SseEmitter subscribe(Long userId, String lastEventId) {
        // 1
        String id = userId + "_" + System.currentTimeMillis();
        
        // 2
        SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));

        emitter.onCompletion(() -> emitterRepository.deleteById(id));
        emitter.onTimeout(() -> emitterRepository.deleteById(id));

	// 3
        // 503 에러를 방지하기 위한 더미 이벤트 전송
        sendToClient(emitter, id, "EventStream Created. [userId=" + userId + "]");

	// 4
        // 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
        if (!lastEventId.isEmpty()) {
            Map<String, Object> events = emitterRepository.findAllEventCacheStartWithId(String.valueOf(userId));
            events.entrySet().stream()
                  .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                  .forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
        }

        return emitter;
    }

    // 3
    private void sendToClient(SseEmitter emitter, String id, Object data) {
        try {
            emitter.send(SseEmitter.event()
                                   .id(id)
                                   .name("sse")
                                   .data(data));
        } catch (IOException exception) {
            emitterRepository.deleteById(id);
            throw new RuntimeException("연결 오류!");
        }
    }
}

[코드의 1번 부분]
subscribe()를 보면 id값을 ${user_id}_${System.currentTimeMillis()} 형태로 사용하는 것을 볼 수 있다. 이렇게 사용하는 이유가 Last-Event-ID 헤더와 상관이 있다.

Last-Event-ID헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미한다고 했다. id값과 전송 데이터를 저장하고 있으면 이 값을 이용하여 유실된 데이터 전송을 다시 해줄 수 있다. 하지만 만약 id값을 그대로 사용한다면 어떤 문제가 있을까?

id값을 그대로 사용한다면 Last-Event-Id값이 의미가 없어진다.

Last-Event-Id = 3

{3, data1}
{3, data3}
{3, data2}

=> 어떤 데이터까지 제대로 전송되었는지 알 수 없다.

데이터의 id값을 ${userId}_${System.currentTimeMillis()} 형태로 두면 데이터가 유실된 시점을 파악할 수 있으므로 저장된 key값 비교를 통해 유실된 데이터만 재전송 할 수 있게 된다.

Last-Event-Id = 3_1631593143664

{3_1631593143664, data1}
{3_1831593143664, data3}
{3_1731593143664, data2}

=> data1 까지 제대로 전송되었고, data2, data3을 다시 보내야한다.

이런 이유로 인해 id값을 ${user_id}_${System.currentTimeMillis()}로 두는 것이다.

[코드의 2번 부분]
클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해줘야한다. SseEmitter 객체를 만들 때 유효 시간을 줄 수 있다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 된다.

id를 key로, SseEmitter를 value로 저장해둔다. 그리고 SseEmitter의 시간 초과 및 네트워크 오류를 포함한 모든 이유로 비동기 요청이 정상 동작할 수 없다면 저장해둔 SseEmitter를 삭제한다.

[코드의 3번 부분]
연결 요청에 의해 SseEmitter가 생성되면 더미 데이터를 보내줘야한다. sse 연결이 이뤄진 후, 하나의 데이터도 전송되지 않는다면 SseEmitter의 유효 시간이 끝나면 503응답이 발생하는 문제가 있다. 따라서 연결시 바로 더미 데이터를 한 번 보내준다.

[코드의 4번 부분]
1번 부분과 관련이 있는 부분이다. Last-Event-ID값이 헤더에 있는 경우, 저장된 데이터 캐시에서 id 값과 Last-Event-ID값을 통해 유실된 데이터들만 다시 보내준다.

서버 - 서비스(데이터 전송) 구현

위의 부분은 클라이언트와 서버가 sse 연결을 맺는 부분이었고, 이제 실제로 서버에서 클라이언트로 일방적인 데이터를 보내는 부분을 하나 구현해보려고 한다. 맨 처음에 얘기한 예시들 중 새로운 리뷰 요청이 생겼을 때 리뷰어에게 알림을 보내는 예시를 구현해보자.

아래 코드는 실제로 클라이언트에 데이터를 전송하는 부분이다. 클라이언트에 보낼 데이터 형식인 Notification 객체를 만들고, 현재 로그인 한 유저의 id값을 통해 SseEmitter를 모두 가져온다. 그 후, 데이터 캐시에도 저장해주고, 실제로 데이터 전송도 한다.

@Service
public class NotificationService {
    // ...
    
    public void send(Member receiver, Review review, String content) {
        Notification notification = createNotification(receiver, review, content);
        String id = String.valueOf(receiver.getId());
        
        // 로그인 한 유저의 SseEmitter 모두 가져오기
        Map<String, SseEmitter> sseEmitters = emitterRepository.findAllStartWithById(id);
        sseEmitters.forEach(
                (key, emitter) -> {
                    // 데이터 캐시 저장(유실된 데이터 처리하기 위함)
                    emitterRepository.saveEventCache(key, notification);
                    // 데이터 전송
                    sendToClient(emitter, key, NotificationResponse.from(notification));
                }
        );
    }

    private Notification createNotification(Member receiver, Review review, String content) {
        return Notification.builder()
                           .receiver(receiver)
                           .content(content)
                           .review(review)
                           .url("/reviews/" + review.getId())
                           .isRead(false)
                           .build();
    }
    
    private void sendToClient(SseEmitter emitter, String id, Object data) {
        try {
            emitter.send(SseEmitter.event()
                                   .id(id)
                                   .name("sse")
                                   .data(data));
        } catch (IOException exception) {
            emitterRepository.deleteById(id);
            throw new RuntimeException("연결 오류!");
        }
    }
}

실제로 알림을 보내고 싶은 로직에서 send 메서드를 호출해주면 된다.

@Service
public class ReviewService {
    // ...
    @Transactional
    public Long create(LoginMember loginMember, ReviewRequest reviewRequest) {
        // ...
        notificationService.send(teacher, savedReview, "새로운 리뷰 요청이 도착했습니다!");

        return savedReview.getId();
    }

아직 실제 서버에 적용되지는 않았지만, 로컬에서 리뷰 요청을 했을 경우 알림이 뜨는 걸 볼 수 있다!

상황 설명: 로그인 한 리뷰어의 id가 5인경우, 5번 리뷰어에게 누군가 새로운 리뷰 요청을 했을 경우.

확장성 고려하기!

위의 구조는 단일 WAS 사용시는 문제없이 동작한다. 하지만 만약 여러 WAS를 사용하는 경우 클라이언트는 알림을 받지 못할 수 있다. 이런 경우를 대비하여 Redis의 pub/sub을 활용해 보려고 한다.
(추가하기)

🚀 트러블 슈팅

EventSource에 토큰 담기

(추가하기)

SSE 테스트

(추가하기)

데이터 캐시 비우기

(추가하기)

브라우저가 닫힌 경우

(추가하기)

📄 reference

4개의 댓글

comment-user-thumbnail
2021년 9월 23일

덕분에 도움 많이 받았습니다!! 감사합니다! (추가 되는 부분도 꼭 보고 싶네요..! )

1개의 답글
comment-user-thumbnail
2022년 5월 6일

정말 감사합니다
덕분에 SSE 구현을 해볼 수 있었습니다.

좋은 자료 공유해주셔서 감사합니다.

답글 달기
comment-user-thumbnail
2023년 3월 2일

덕분에 도움 많이 받았습니다.!! 혹시 git 주소 공유 가능할까요!?

답글 달기