SSE 사용기 (1)

Caesars·2023년 10월 9일
2

Springboot

목록 보기
4/6

클라이언트에서 요청이 없어도 서버에서 메시지를 전달해야 하는 기능을 구현해야 하는데 클라이언트에서 서버로 요청을 보낼 일은 없기에 Websocket은 오버 스펙인 상황이었습니다. 찾아보니 SSE가 더 적합하기에 사용해보고 내용을 정리했습니다.


SSE란

Server-Sent Events(이하 SSE)는 HTTP 스트리밍을 통해 서버에서 클라이언트로 단방향의 Push Notification을 전송할 수 있는 HTML5 표준 기술입니다. Spring Boot에서 SSE를 이용한 단방향 스트리밍 통신 방법을 실제 운영 프로덕션 레벨 관점에서 작성했습니다.

서버와 클라이언트 통신

Request-Response

  • 전통적인 HTTP 기반 통신. 클라이언트에서 서버로 요청을 보내면 서버가 응답합니다.

Polling

  • 클라이언트가 주기적으로 서버에 업데이트 요청을 보내는 방법. 지속적인 HTTP 요청이 발생하기 때문에 리소스 낭비가 발생합니다.

SSE

  • SSE는 단방향 통신을 지원하며, 서버에서 클라이언트로 데이터를 주기적으로 푸시합니다. 연결은 지속적으로 유지되며, 데이터를 비동기적으로 전달하는 데 사용됩니다.

Websocket

  • 웹 소켓은 양방향 통신을 지원하는 프로토콜로, 클라이언트와 서버 간에 연결을 유지하고 양방향 데이터 교환을 허용합니다.

웹소켓과 비교

WebSocketSSE
통신 방향양방향단방향(서버 -> 클라이언트)
데이터형태Binary, UTF-8UTF-8
자동 재접속X(단 ws 라이브러리에서 제공)O
프로토콜websocketHTTP
배터리 소모량작음

SSE는 주로 웹소켓과 자주 비교됩니다. 클라이언트와 서버가 커넥션을 유지하고 클라이언트의 요청없이 데이터를 수신한다는 점에서 그렇습니다. 하지만 세부적으로 보면 웹소켓과 SSE는 몇 가지 차이점이 있습니다.

  • 데이터 : SSE의 이벤트 데이터는 UTF-8 인코딩된 문자열만 지원합다. 일반적으로 JSON으로 마샬링하여 전송합니다.

  • 자동 재접속 : WebSocket 연결이 닫힐 때 클라이언트는 서버에 다시 연결을 시도하지 않습니다. 즉, 서버를 폴링하기 위해 추가 코드를 작성해야 함을 의미합니다. 단 SockJs나 Socket.IO 와 같은 재연결 지원 기능이 있는 라이브러리를 사용할 수는 있습니다.
    반대로 SSE는 연결이 끊어진 후 연결을 다시 설정하므로 필수 동작을 달성하기 위해 작성해야 하는 코드가 줄어듭니다.

  • 프로토콜 : 자체 TCP 연결이 필요합니다. 연결을 설정하는 데에만 HTTP를 사용하지만 WebSocket 프로토콜을 사용할 수 있는 독립형 TCP 연결로 바뀝니다. SSE는 별도의 프로토콜을 사용하지 않고 HTTP 프로토콜만으로 사용이 가능하며 훨씬 가볍습니다.

  • 배터리 소모 : 웹소켓은 HTTP와는 별도의 프로토콜을, HTTP와 달리 전이중 양방향 통신을 지원합니다. 이에 따라 웹 소켓 연결은 SSE보다 더 많은 네트워크 활동을 유발할 수 있습니다. 링크를 보면 모바일 기기에서 더욱 더 소모가 심하다고 합니다. 네트워크 탭 열어서 보면 확인 가능한게 소켓은 계속 서버랑 계속 handshake를 하고 있습니다. 하지만 SSE는 한번 열어두고 계속 기다리고 있습니다. 아무리 소켓이 서버에 보내는 데이터가 적긴 하지만 계속 열려 있다면 어쩼든 왔다 갔다 하니까 더 클 수밖에 없습니다.

실시간 애플리케이션 사이에서도 데이터 흐름은 일반적으로 비대칭입니다. 서버는 대부분의 메시지를 보내는 반면 클라이언트는 대부분 듣기만 하고 가끔 업데이트를 보냅니다. 예를 들어, 채팅 애플리케이션에서 사용자는 각각 수십 또는 수백 명의 참가자가 있는 여러방에 연결될 수 있습니다. 따라서 수신된 메시지의 양은 보낸 메시지의 양을 훨씬 초과합니다. 만약 클라이언트가 데이터 수신만 하는 시나리오라면 SSE가 웹소켓보다 더 나은 선택지일 수 있습니다.

SSE가 선호되지 않는 이유?

SSE는 어느정도 웹소켓의 역할을 하면서 더 가볍고 구현이 쉽다는 장점이 있습니다. 그럼에도 SSE는 출시된 이후로 한번도 웹소켓의 그늘을 넘어선 적이 없습니다. 커뮤니티 사이트의 화제글이나 쇼핑몰의 실시간 인기 상품 등 클라이언트에서 데이터를 전송할 필요 없는 시나리오에서도 대부분 웹소켓으로 구현되어 있습니다.
아마 확장성이 부족한 것이 원인이지 않았나 싶습니다. 요구 사항이 변경되면 결국 WebSocket을 사용하여 리팩터링해야 할 가능성이 높습니다. WebSocket 기술은 더 많은 초기 작업을 제공하지만 더 다양하고 확장 가능한 프레임워크이므로 새로운 기능을 추가할 가능성이 있는 복잡한 애플리케이션에 더 나은 옵션입니다.

코드

spring framework 4.2부터 SSE 통신을 지원하는 SseEmitter 클래스가 생겼습니다. SseEmitter를 사용하여 서비스를 구현해보려고 합니다. 해당 프로젝트는 링크 에서 확인하실 수 있습니다.

SseController

@RequiredArgsConstructor
@Slf4j
@RestController
@RequestMapping()
public class SseApiController {

    private final SseConnectionPool sseConnectionPool;

    private final ObjectMapper objectMapper;

    @RequestMapping(path = {"", "/main"})
    public ModelAndView main(){
        return new ModelAndView("main");
    }

    @GetMapping(path = "/api/sse/connect/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseBodyEmitter connect(@PathVariable String id) {
        log.info("login user {}", id);

        var userSseConnection = UserSseConnection.connect(
                id,
                sseConnectionPool,
                objectMapper);

        sseConnectionPool.addSession(userSseConnection.getUniqueKey(), userSseConnection);

        return userSseConnection.getSseEmitter();
    }

    @GetMapping("/api/sse/push-event/{id}")
    public void pushEvent(@PathVariable String id) {
        var userSseConnection = sseConnectionPool.getSession(id);

        Optional.ofNullable(userSseConnection)
                .ifPresent(it -> {
                    it.sendMessage(Map.of("id", it.getUniqueKey(), "message", "helloworld"));
                });
    }

    @GetMapping("/api/sse/broadCast")
    public void broadCast() {
        sseConnectionPool.broadCast();
    }
}

UserSseConnection

@Getter
@ToString
@EqualsAndHashCode
public class UserSseConnection {

    private final String uniqueKey;

    private final SseEmitter sseEmitter;

    private final ConnectionPoolIfs<String, UserSseConnection> connectionPoolIfs;

    private final ObjectMapper objectMapper;

    private UserSseConnection(String uniqueKey,
                              ConnectionPoolIfs<String, UserSseConnection> connectionPoolIfs,
                              ObjectMapper objectMapper) {
        // key 초기화
        this.uniqueKey = uniqueKey;

        // sse 초기화
        this.sseEmitter = new SseEmitter(60 * 1000L);

        // objectMapper 초기화
        this.objectMapper = objectMapper;

        // call back 초기화
        this.connectionPoolIfs = connectionPoolIfs;

        // on completion
        this.sseEmitter.onCompletion(() -> {
            // connection pool remove
            this.connectionPoolIfs.onCompletionCallback(this);
        });

        // on timeout
        this.sseEmitter.onTimeout(() -> {
            this.sseEmitter.complete();
        });

        // onopen 메시지
        this.sendMessage("onopen", "connect");
    }

    public static UserSseConnection connect(String uniqueKey,
                                            ConnectionPoolIfs<String, UserSseConnection> connectionPoolIfs,
                                            ObjectMapper objectMapper) {
        return new UserSseConnection(uniqueKey, connectionPoolIfs, objectMapper);
    }

    public void sendMessage(String eventName, Object data) {
        try {
            var json = this.objectMapper.writeValueAsString(data);
            var event = SseEmitter.event()
                    .name(eventName)
                    .data(json);
            this.sseEmitter.send(event);
        } catch (IOException e) {
            this.sseEmitter.completeWithError(e);
        }
    }

    public void sendMessage(Object data) {
        try {
            var json = this.objectMapper.writeValueAsString(data);
            var event = SseEmitter.event()
                    .data(json);
            this.sseEmitter.send(event);
        } catch (IOException e) {
            this.sseEmitter.completeWithError(e);
        }
    }
}

생성자를 통해 SseEmitter의 만료시간을 설정할 수 있습니다. 디폴트 값은 서버에 따라 다릅니다. 스프링 부트의 내장 톰캣을 사용하면 30초로 설정됩니다. 만료시간이 되면 브라우저에서 자동으로 서버에 재연결 요청을 보냅니다.

SseConnectionPool

@Slf4j
@Component
public class SseConnectionPool implements ConnectionPoolIfs<String, UserSseConnection> {

    private static final Map<String, UserSseConnection> connectionPool = new ConcurrentHashMap<>();

    @Override
    public void addSession(String uniqueKey, UserSseConnection userSseConnection) {
        connectionPool.put(uniqueKey, userSseConnection);
    }

    @Override
    public UserSseConnection getSession(String uniqueKey) {
        return connectionPool.get(uniqueKey);
    }

    @Override
    public void onCompletionCallback(UserSseConnection session) {
        log.info("call back connection pool completion : {}", session);
        connectionPool.remove(session.getUniqueKey());
    }

    @Override
    public void broadCast() {
        connectionPool.forEach((k, v) -> {
            v.sendMessage(Map.of("id", v.getUniqueKey(), "message", "helloworld"));
        });
    }
}

Connection 관리는 thread-safe한 자료구조를 사용하지 않으면 동시성 에러가 발생할 수 있습니다. 여기서는 thread-safe한 자료구조인 ConcurrentHashMap을 사용하였습니다.

클라이언트 구현

SSE 통신을 하기 위해서는 처음에는 클라이언트에서 서버로 연결이 필요합니다. 클라이언트에서 서버로 sse 연결 요청을 보내기 위해서 자바스크립트는 EventSource를 제공합니다.

<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>이벤트 관리</title>
    <script src="https://cdn.jsdelivr.net/npm/vue@2.6.14/dist/vue.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>

    <style>
        table {
            border-collapse: collapse;
            width: 100%;
        }

        th, td {
            padding: 8px;
            text-align: left;
            border-bottom: 1px solid #ddd;
        }

        .button-container button {
            margin-right: 8px;
        }
    </style>

</head>
<body>
<div id="app">
    <table>
        <thead>
        <tr>
            <th>ID</th>
            <th>메시지</th>
            <th>시간</th>
        </tr>
        </thead>
        <tbody>
        <tr v-for="event in events" :key="event.id">
            <td>{{ event.id }}</td>
            <td>{{ event.message }}</td>
            <td>{{ getCurrentTime() }}</td>
        </tr>
        </tbody>
    </table>
</div>

<script>
    new Vue({
        el: "#app",
        data: {
            events: [] // 서버로부터 받은 데이터를 저장할 배열
        },
        methods: {
            pushData(event){
                this.events.unshift(event);
            },
            make4Str() {
                return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);
            },
            getCurrentTime() {
                const now = new Date();
                // 시간을 원하는 형식으로 포맷팅하거나 필요한 작업을 수행
                const formattedTime = now.getHours() + ':' + now.getMinutes() + ':' + now.getSeconds();
                return formattedTime;
            }
        },
        mounted() {
            let id = this.make4Str();
            // SSE 연결
            const url = "http://localhost:8080/api/sse/connect/"+id;    // 접속주소
            const eventSource = new EventSource(url);               // sse 연결

            eventSource.onopen = event => {
                console.log("sse connection")
            }

            eventSource.onmessage = event => {
                console.log("receive : "+event.data);
                const data = JSON.parse(event.data);
                this.pushData(data);
            }

            eventSource.onerror = event => {
                console.log("sse error", event);
            }
        }
    });
</script>
</body>
</html>

서버에서 업데이트가 푸시되면 onmessage 핸들러가 실행되고 e.data 속성에서 새 데이터를 사용할 수 있습니다. 연결이 종료될 때마다 브라우저가 3초 정도 후에 자동으로 소스에 다시 연결됩니다.

테스트

Jmeter를 사용해서 10000개의 커넥션 유지와 메시지 수신을 테스트 했습니다. Jmeter는 자바로 만들어진 웹 어플리케이션 성능 테스트 오픈 소스입니다.
본래 테스트 하는 웹 어플리케이션 서버와 테스트를 돌리는 서버가 같으면 같은 메모리를 사용하기 때문에 정확한 값을 측정할 수 없지만 테스트가 목적이니 저는 그냥 로컬에서 같이 쓰겠습니다.

New Test 생성 후 쓰레드 그룹과 Controller에 요쳥을 보내는 HTTP Request, unique 값으로 사용할 Random Variable 그리고 결과를 받아볼 View Result를 만들겠습니다.


Thread Properties

  • Number of Threads : 쓰레드 개수
  • Ramp-up period : 쓰레드 개수를 만드는데 소요되는 시간.

  • Server Name or IP : localhost
  • Port Number : 8080
  • HTTP Request : Get /api/sse/connect/jmeter${rd}
  • Variable : 1~9999999999 사이의 랜덤 값을 사용

위와 같이 설정했다면 Jmeter 테스트를 실행하고 서버에서 SSE를 발송합니다.


Response Body를 보면 서버에서 보냈던 SSE를 정상적으로 수신했습니다.


후기

어느 정도 완성되었다고 생각했지만 찾아보니 결점이 하나 있습니다. 만약 수백만대의 커넥션이 필요한 상황이라면 한 대의 인스턴스로 유지할 수는 없을테고 서버 scale out 이 필요합니다. 링크 에서 확인할 수 있듯이 여러 대의 서버로 분산처리하도록 ALB에 붙이자 클라이언트가 서버를 번갈아가면서 호출할 수 있습니다. 해결 방안에 대해 다음 장에서 서술해 보겠습니다.


참고

https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
https://surviveasdev.tistory.com/entry/%EC%9B%B9%EC%86%8C%EC%BC%93-%EA%B3%BC-SSEServer-Sent-Event-%EC%B0%A8%EC%9D%B4%EC%A0%90-%EC%95%8C%EC%95%84%EB%B3%B4%EA%B3%A0-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0
https://ably.com/topic/server-sent-events
https://hamait.tistory.com/792

profile
잊기전에 저장

0개의 댓글