Spring에서 SSE(Server Sent Event) 구현하기(Feat. 알림 기능 구현)

Minjae An·2024년 1월 21일
0

Spring Web

목록 보기
6/9
post-custom-banner

개요

특정 유저가 서버에 어떤 데이터의 변경을 요청을 하여 이 작업을 서버에 반영되면 그 작업과 관련된 유저들에게 해당 내역을 고지해야 한다고 가정해보자. 이러한 알림 기능은 어떤 형태로 구현할 수 있을까?

클라이언트-서버 통신

웹서비스에서 사용하는 HTTP의 특징 중 하나는 비연결성이다. 클라이언트와 서버는 한 번의 요청, 응답 과정 이후 커넥션을 기본적으로 유지하지 않는다. 따라서 A라는 유저가 어떤 데이터를 변경하여 이를 B라는 유저에게 알리려고 해도 B와 서버간 커넥션이 존재하지 않아 고지할 수 없는 상황이 발생할 수 있다.

폴링(polling)

앞선 HTTP의 비연결성을 극복할 수 있는 첫번째 방법은 폴링이다. 클라이언트가 계속 평범한 요청을 날려 이벤트 발생시 그 이후 응답을 통해 이벤트에 대한 결과를 받는 방식이다. 쉽게 생각할 수 있는 방식이지만 클라이언트가 계속 요청을 보내며 서버에 부담이 증가할 수 있다. 또한 자주 커넥션을 맺고 끊는 것에 대한 비용 부담이 크다. 주기적으로 갱신되는 데이터가 존재할 때 이런 방식이 도움이 될 순 있지만 알림같이 비주기적인 기능을 개발하는 데는 부적절하다고 볼 수 있다.

긴 폴링(long polling)

긴 폴링같은 경우 폴링과 달리 지속 요청하지 않고 커넥션 유지 시간을 늘리는 방식이다. 커넥션을 계속 유지하고 요청이 올 경우 해당 요청을 처리하는 방식이다. 응답을 받은 이후 커넥션을 다시 유지하기 위한 요청을 바로 보낸다. 커넥션을 유지하며 요청에 대한 응답을 바로 받기 때문에 실시간이 보장되며 기존 폴링 방식과 달리 요청을 계속 보내지 않아 부담이 덜하다. 하지만 시간 간격이 좁다면 폴링과 큰 차이가 없고 다수의 클라이언트에게 동시에 이벤트가 발생될 경우 응답을 보내고 요청을 다시 처리해야 하기 때문에 순간적인 부담이 급증하게 된다.

양방향 통신을 지원하는 웹 소켓같은 방식도 존재하나, 이는 단방향으로 전송되는 알림같은 기능을 구현하는데 비효율적일 수 있다. 이런 단방향 이벤트 전달에 최적인 방식이 바로 SSE이다.

SSE(server sent events)

SSE에서 클라이언트는 서버로부터 데이터를 받을 수만 있다. 웹소켓과 달리 HTTP만으로 사용이 가능하기에 훨씬 간단하다. 접속에 문제가 있다면 자동으로 재연결을 시도하지만 클라이언트가 페이지를 닫아도 서버에서 감지하기가 어렵다. 또 다른 특징으로는 HTTP/1.1의 경우 브라우저당 6개의 접속만을 허가하며 HTTP/2에서는 100개까지의 접속을 허용한다. 알림 기능만을 고려했을 때, 웹소켓보다 가벼운 SSE를 선택하는 것이 최적의 선택이라 볼 수 있다.

Spring에서는 4.2부터 SseEmitter 클래스를 제공하여 SSE 통신을 구현할 수 있도록 지원한다. JS에서는 EventSource 를 이용해 연결 생성 및 전송된 이벤트에 대한 제어가 가능하다. EventSource 를 이용해 연결 생성 요청을 서버에 보낸다면 이를 처리해 연결을 진행하는 흐름으로 로직을 구성할 수 있다.

Notification

package com.example.springallinoneproject.notification;

import com.example.springallinoneproject.common.BaseEntity;
import com.example.springallinoneproject.user.entity.User;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Entity
@Getter
@Builder
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
public class Notification extends BaseEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String content;
    private String relatedUrl;
    @Column(nullable = false)
    private boolean isRead;
    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private NotificationType notificationType;

    @ManyToOne
    @JoinColumn(name = "user_id")
    private User receiver;

    public void read(){
        isRead=true;
    }
}

알림을 나타내기 위한 엔티티를 구현한다. 알림을 받는 사용자를 식별할 수 있도록 User 엔티티와의 연관관계를 설정해주었다.

NotificationRepository

package com.example.springallinoneproject.notification;

import org.springframework.data.jpa.repository.JpaRepository;

public interface NotificationRepository extends JpaRepository<Notification, Long> {
}

EmitterRepository, EmitterRepositoryImpl

SseEmitter 를 통해 사용자에게 알림을 보내게 되는데 이를 위해선 어떤 회원에게 어떤 Emitter가 연결되어 있는지, 어떤 이벤트들이 현재까지 발생했는 지에 대해 저장하고 있어야 한다. 이를 위하여 EmitterRepository 를 구현해주었다. Emitter를 저장하는 자료구조가 바뀔 가능성이 있기 때문에 EmitterRepository를 인터페이스로 정의하고 구현체를 구성하는 구조로 코드를 작성했다.

EmitterRepository

package com.example.springallinoneproject.notification.emitter;

import java.util.Map;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

public interface EmitterRepository {
    SseEmitter save(String emitterId, SseEmitter sseEmitter);

    void saveEventCache(String eventCacheId, Object event);

    Map<String, SseEmitter> findAllEmitterByUserId(String userId);

    Map<String, Object> findAllEventCacheByUserId(String userId);

    void deleteById(String emitterId);

    void deleteAllEmitterByUserId(String userId);

    void deleteAllEventCacheByUserId(String userId);
}

EmitterRepositoryImpl

package com.example.springallinoneproject.notification.emitter;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Repository
public class EmitterRepositoryImpl implements EmitterRepository {
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterByUserId(String userId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(userId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, Object> findAllEventCacheByUserId(String userId) {
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(userId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public void deleteById(String emitterId) {
        emitters.remove(emitterId);
    }

    @Override
    public void deleteAllEmitterByUserId(String userId) {
        List<String> emitterIds = emitters.keySet().stream()
                .filter(key -> key.startsWith(userId))
                .toList();

        emitterIds.forEach(emitters::remove);
    }

    @Override
    public void deleteAllEventCacheByUserId(String userId) {
        List<String> eventIds = eventCache.keySet().stream()
                .filter(key -> key.startsWith(userId))
                .toList();

        eventIds.forEach(eventCache::remove);
    }
}

Emitter와 발생 이벤트 데이터는 이벤트 수신 사용자 userId 에 발생/생성 시간을 덧붙여 Id를 부여해 관리한다. 동시성을 고려하여 ConcurrentHashMap을 사용하였다.

NotificationService

package com.example.springallinoneproject.notification;

import com.example.springallinoneproject.converter.NotificationConverter;
import com.example.springallinoneproject.notification.emitter.EmitterRepository;
import com.example.springallinoneproject.user.entity.User;
import java.io.IOException;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
@RequiredArgsConstructor
public class NotificationService {
    private final NotificationRepository notificationRepository;
    private final EmitterRepository emitterRepository;
    private final Long timeoutMillis = 600_000L;

    public SseEmitter subscribe(Long userId, String lastEventId) {
        String emitterId = makeTimeIncludeId(userId);
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(timeoutMillis));
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

        String eventId = makeTimeIncludeId(userId);
        sendNotification(emitter, eventId,
                emitterId, "EventStream Created. [userId=%d]".formatted(userId));

        if (hasLostData(lastEventId)) {
            sendLostData(lastEventId, userId, emitterId, emitter);
        }

        return emitter;
    }

    public void send(User receiver, NotificationType notificationType, String content, String relatedUrl) {
        Notification notification = notificationRepository
                .save(createNotification(receiver, notificationType, content, relatedUrl));

        String receiverId = String.valueOf(receiver.getId());
        String eventId = makeTimeIncludeId(receiver.getId());
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterByUserId(receiverId);
        emitters.forEach(
                (id, emitter) -> {
                    emitterRepository.saveEventCache(id, notification);
                    sendNotification(emitter, eventId, id,
                            NotificationConverter.toCreateNotificationDTO(notification));
                }
        );
    }

    private String makeTimeIncludeId(Long id) {
        return id + "_" + System.currentTimeMillis();
    }

    private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(eventId)
                    .data(data));
        } catch (IOException e) {
            emitterRepository.deleteById(emitterId);
        }
    }

    private boolean hasLostData(String lastEventId) {
        return !lastEventId.isEmpty();
    }

    private void sendLostData(String lastEventId, Long userId, String emitterId, SseEmitter emitter) {
        Map<String, Object> eventCaches = emitterRepository
                .findAllEventCacheByUserId(String.valueOf(userId));
        eventCaches.entrySet().stream()
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
    }

    private Notification createNotification(User receiver, NotificationType notificationType,
                                            String content, String url) {
        return Notification.builder()
                .receiver(receiver)
                .notificationType(notificationType)
                .content(content)
                .relatedUrl(url)
                .isRead(false)
                .build();
    }
}

앞서 언급했던 대로 Emitter가 어떤 사용자와 연결되어 있는 지 구분하기 위해 다음 로직을 통해 Id를 부여해주었다. 한편, HTTP 1.1, 2.0 버전에서는 브라우저당 여러 개의 SSE 커넥션을 유지할 수 있기 때문에 동일 사용자가 사용하는 서로 다른 Emitter들도 구분할 필요가 있다.

private String makeTimeIncludeId(Long id) {
    return id + "_" + System.currentTimeMillis();
}

한편, Emitter를 등록할 때 만료시 자동으로 삭제될 수 있게 콜백을 설정할 수도 있다.

emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

먄약, 등록을 진행한 뒤, SseEmitter의 유효 시간동안 어느 데이터도 전송되지 않는다면 503 Service Unavailable 에러를 발생시키므로 이에 대한 방안으로 맨 처음 연결을 진행할 때 더미 데이터라도 보내 에러를 방지해야 한다.

String eventId = makeTimeIncludeId(userId);
sendNotification(emitter, eventId,
        emitterId, "EventStream Created. [userId=%d]".formatted(userId));

TODO : 내용 마저 작성

전체 코드

참고

profile
먹고 살려고 개발 시작했지만, 이왕 하는 거 잘하고 싶다.
post-custom-banner

0개의 댓글