Spring Boot, SSE 알림 개발

wellbeing-dough·2023년 12월 14일
1

/목차

1. SSE (Server Sent Events)란?

클라이언트 요청이 없더라도 서버에서 데이터를 줘야하는 경우가 있다
우리는 알람 기능을 구현하는데, 클라이언트가 알람을 달라는 요청을 지속적으로 보낼 수 있지만(polling 방식) 계속적인 request는 서버의 부담이 점점 늘어가고, 매 요청마다 새로운 알람이 있는지 물어보는 것은 polling방식보다 서버의 부담이 덜하겠지만 그래도 DB connection등 서버의 부담이 아예 없는건 아니다

폴링(Polling) 방식


위에 설명한 폴링 방식인데 이벤트가 발생하면 그 이후에 request로 이벤트에 대한 응답을 받는 방식이다 이건 계속 request를 보내는 것이기 때문에 서버의 부담이 생긴다 그리고 이벤트가 발생하고 다음 요청까지 이벤트의 반영이 안되기 때문에 실시간을 보장하지 못한다

긴 폴링(Long Polling) 방식


계속적으로 요청하는게 아니라 커넥션의 유지 시간을 길게 가진다 그래서 그 커넥션동안 이벤트가 발생하면 지속되고 있는 커넥션으로 response를 보내고 다시 커넥션을 유지할 request를 보내는 방식이다 이렇게 하면 실시간은 보장하지만 이벤트가 자주 발생하는 경우에는 서버에 부담은 폴링 방식과 큰 차이가 없다

SSE(Server Sent Events) 방식


HTTP 프로토콜을 사용하며 서버의 이벤트를 클라이언트로 실시간, 지속적으로 보내는 기술이다 또한 서버에서 클라이언트 단방향으로만 보낼 수 있다
클라이언트가 주기적으로 서버에 요청을 보낼 필요가 없어서 폴링, 긴 폴링 방식보다 서버 부하가 적고 실시간성을 유지하지만 지속적으로 연결해야되서 이 또한 네트워크 연결을 소비하는 거고 연결하는 클라이언트가 많다면 이 또한 서버의 부담이 증가하게 된다

Web Socekt 방식


SSE와의 차이점은 HTTP프로토콜이 아닌 별도의 프로토콜을 사용하며 양방향 통신이다 웹 소켓 포트에 접속해 있는 모든 클라이언트에게 이벤트 방식으로 응답하면 된다 하지만 서버만 이벤트를 전송하고 채팅도 아닌데 알림에서 양방향 통신은 비효율적인거 같다

2. 요구사항

  1. 유저가 투표를 하면 해당 투표를 작성한 유저 + 해당 투표에 투표를 한 유저에게 10명 단위로 x0명 이상이 투표했어요 라는 알림을 준다
  2. 투표에 댓글이 달리면 투표를 작성한 유저에게 댓글이 달렸다는 알림을 준다

3. flow

구독 (Subscribe): 클라이언트가 알림을 받기 위해 서버에 구독 요청을 합니다. 이때, 사용자의 ID(userId)와 마지막으로 받은 이벤트의 ID(lastEventId)를 파라미터로 전달합니다.

1-1) subscribe() 메소드에서는 먼저 고유한 emitterId를 생성합니다. (userId와 현재 시간을 조합)

1-2) 새로운 SseEmitter 객체를 생성하고, 이 객체를 emitterRepository에 저장합니다.

1-3) SseEmitter 연결이 종료되거나 타임아웃될 경우 해당 emitterId를 제거하는 로직을 설정합니다.

1-4) 초기 연결 상태 확인 및 HTTP 503 에러 방지를 위해 더미 이벤트("EventStream Created")를 전송합니다.

1-5) 만약 클라이언트가 놓친 이벤트(lastEventId 이후의 이벤트들)가 있으면 그것들도 보내줍니다.

알림 발송 (Send Notification): 어떤 사건이 발생하여 특정 사용자에게 알림을 보내야 할 때 호출됩니다. 
2-1) 새로운 알림 객체(Notification)를 생성하고 데이터베이스에 저장합니다.

2-2) 해당 사용자(receiver, 즉 알림 수신자)에게 등록된 모든 SseEmitter들을 찾습니다.

2-3) 각각의 SseEmitter에 대해, 새로운 알림 데이터와 함께 'sse'라는 이름의 이벤트를 전송합니다.

재전송 (Resend Lost Data): 만약 클라이언트가 일부 데이터를 놓쳐서 받지 못했다면 그것들도 재전송하는 로직입니다.

  • emitterId: 이는 특정 SseEmitter 객체를 식별하기 위한 ID입니다. 여기서 사용된 방식에 따르면, 각 사용자마다 고유의 emitterId가 생성되며, 이를 통해 사용자의 SSE 연결을 관리하게 됩니다.

+) SseEmitter는 단지 서버에서 클라이언트로 이벤트를 전송하는 도구일 뿐이며, 알림의 '읽음' 상태를 추적하거나 관리하지 않습니다.

eventId: 이는 서버에서 클라이언트로 전송되는 개별 이벤트를 식별하기 위한 ID입니다. 각각의 알림 메시지나 더미 메시지 등은 고유의 eventId를 가집니다.

따라서, 일반적으로 하나의 emitterId(하나의 SSE 연결)가 여러 개의 eventId(여러 개의 이벤트)를 가질 수 있습니다.

4. 구현

  1. VoteService에서 NotificationService를 참조하여 알림을 보내는 방식을 사용했는데 NotificationService에서 투표를 만드 유저를 가져오기 위해서 VoteService를 사용하여 알림을 보내기 때문에 순환 참조 오류가 생겼다 따라서 Spring에서 제공하는 이벤트 리스너를 사용하여 VoteService에서 에서는 NotificationService를 를 의존할 필요없이 알림을 전송할 수 있도록 만들어 보려고 한다.
@RequiredArgsConstructor
@Service
@Transactional(readOnly = true)
public class VoteService {

    private final UserRepository userRepository;
    private final VoteRepository voteRepository;
    private final VoteValidator voteValidator;
    private final PageableConverter pageableConverter;
    private final ApplicationEventPublisher eventPublisher;

    @Transactional
    public void doVote(DoVoteInfo info) {
        Vote vote = voteRepository.findById(info.getVoteId()).orElseThrow(VoteNotFoundException::new);
        User user = userRepository.findById(info.getUserId()).orElseThrow(UserNotFoundException::new);
        voteValidator.validateParticipateVote(vote, user);
        VoteResult voteResult = new VoteResult(vote.getId(), user.getId(), info.getChoice());
        voteResultRepository.save(voteResult);
        eventPublisher.publishEvent(new DoVoteEvent(this, vote.getId()));
    }
}

투표를 하면 eventPublisher가 publishEvent를 해준다

@Getter
public class DoVoteEvent extends ApplicationEvent {
    private final Long voteId;

    public DoVoteEvent(Object source, Long voteId) {
        super(source);
        this.voteId = voteId;
    }
}
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationSender {

    private final VoteRepository voteRepository;
    private final UserRepository userRepository;
    private final VoteResultRepository voteResultRepository;
    private final NotificationService notificationService;


    @TransactionalEventListener
    @Async
    public void handleCommentCreated(CommentCreatedEvent event) {
        sendNotificationForNewComments(event.getVoteId());
    }

    @TransactionalEventListener
    @Async
    public void handleDoVote(DoVoteEvent event) {
        sendNotificationsForVoters(event.getVoteId());
    }

    public void sendNotificationForNewComments(Long voteId) {
        Vote vote = voteRepository.findById(voteId).orElseThrow(VoteNotFoundException::new);
        User receiver = userRepository.findById(vote.getPostedUserId()).orElseThrow(UserNotFoundException::new);
        String title = vote.getTitle();
        String content = "투표에 댓글이 달렸습니다.";
        String url = "vote/" + voteId;
        notificationService.send(receiver, Notification.NotificationType.COMMENT, title, content, url);
        log.info("Thread: {}, Notification sent to user: {}, type: {}, content: {}, url: {}",
                Thread.currentThread().getName(), receiver.getId(), Notification.NotificationType.COMMENT, title, content, url);
    }

    public void sendNotificationsForVoters(Long voteId) {
        Vote vote = voteRepository.findById(voteId).orElseThrow(VoteNotFoundException::new);
        Long count = voteResultRepository.countByVoteId(voteId);
        if (count % 10 == 0 && count != 0) {
            String title = vote.getTitle();
            String content = "투표에 " + count + "명 이상이 참여했어요!";
            String url = "vote/" + voteId;
            List<VoteResult> voteResultList = voteResultRepository.findByVoteId(voteId);
            sendNotificationsToVoters(title, content, url, voteResultList);
        }
    }

    private void sendNotificationsToVoters(String title, String content, String url, List<VoteResult> voteResultList) {
        for (VoteResult result : voteResultList) {
            CompletableFuture.runAsync(() -> {
                User receiver = userRepository.findById(result.getVotedUserId()).orElseThrow(UserNotFoundException::new);
                notificationService.send(receiver, Notification.NotificationType.VOTE, title, content, url);
                log.info("Thread: {}, Notification sent to user: {}, type: {}, content: {}, url: {}",
                        Thread.currentThread().getName(), receiver.getId(), Notification.NotificationType.COMMENT, title, content, url);
            });
        }
    }

}
  • ApplicationEventPublihser는 스프링의 ApplicationContext가 상속하는 인터페이스 중 하나이다 옵저버패턴(관찰자들이 관찰하고 있는 대상자의 상태가 변화가 있을 때마다 대상자는 직접 목록의 각 관찰자들에게 통지하고, 관찰자들은 알림을 받아 조치를 취하는 행동 패턴)의 구현체로 이벤트에 필요한 기능을 제공해준다
    eventPublisher의 publishEvent()메서드로 어플리케이션 이벤트를 발생시킨다 추후에 투표를 했을때 생기는 event가 추가로 발생해도 voteService의 의존성을 추가할 필요 없다 또한 트랜잭션의 문제도 있다 만약에 투표는 되었는데 알림을 보내는데 실패하면 원자성을 위해 투표된것도 롤백되게 된다
  • 그래서 우리는 @TransactionalEventListener를 사용해서 리스너를 등록해줬고 @EventListener를 안쓴 이유는 아까 말한것처럼 투표는 되었는데 알림에서 실패하면 투표한것도 하나의 트랜잭션으로 롤백된다 하지만 @TransactionalEventListener를 사용하면 투표하기 트랜잭션이 커밋된 이후에 리스너가 동작한다 투표하기가 성공하고 이벤트에서 뻑나도 투표하기는 이미 커밋되었기 때문에 롤백되지 않는다
  • 비동기로 구현한 이유는 투표한 사람이 1000명이라면 for문으로 하나하나 다 동기로 구현하면 성능 문제를 초래할 수 있다.
  1. 모든 알림을 순차적으로 처리하면 시간이 오래 걸릴 수 있고, 사용자 경험에 부정적인 영향을 미칠 수 있다 또한 동기적인 호출은 모든 사용자에게 알림을 보낼 때까지 기다려야 한다. 이는 네트워크 타임아웃 및 연결 문제로 인해 호출이 실패할 가능성이 있고, 동기적인 호출은 블록킹 작업으로 인해 다른 작업들이 대기해야 할 수 있다 이는 다른 중요한 작업들의 실행을 지연시킬 수 있다
  2. 한 요청이 오래걸리면 그만큼 서버에 리소스를 잡아먹기 때문에 서버에 많은 부하가 생긴다
  • 하지만 이 방법은 Thread를 관리할 수 없어서 매우 위험하다. 가령, 동시에 10,000개의 호출이 이뤄진다면 아주 짧은 시간에 Thread를 10,000개 생성해야 한다. Thread를 생성하는 비용은 적지 않기 때문에 프로그램의 성능에 악영향을 미치며, 심지어는 OOM(OutOfMemory) 에러가 발생할 수 있다. 따라서 Thread를 관리하기 위해서는 Thread Pool을 구현해야 하고, Java에서는 ExecutorService 클래스를 제공하고 있다.
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();	//스프링이 제공하는 스레드 풀 구현체
        taskExecutor.setCorePoolSize(1); //핵심 스레드 수 1로 설정 스레드 풀의 최소 스레드 수
        taskExecutor.setMaxPoolSize(5); //최대 스레드 수 5로 설정 풀이 생성하는 최대 스레드 수
        taskExecutor.setQueueCapacity(10); //작업 대기열의 크기 10
        taskExecutor.setThreadNamePrefix("async-thread-"); //각 스레드의 이름 설정
        taskExecutor.initialize();
        return taskExecutor;
    }
}
  • 비동기 작업을 처리하기 위한 설정, @EnableAsync 어노테이션과 AsyncConfigurer 인터페이스를 구현하는 클래스를 사용하여 설정된다
  • AsyncConfigurer의 인터페이스를 구현해서 비동기 처리에 대한 추가적인 커스터마이징을 제공할 수 있다
  • 이렇게 하면 동기 방식을 쓰다가 비동기 방식을 원하면 @Async만 메서드위에 추가하면 된다
  • private 메소드에 @Async를 붙여도 AOP가 동작하지 않는다.
  • 같은 객체 내의 메소드끼리 호출할 시 AOP가 동작하지 않는다.
@Repository
@RequiredArgsConstructor
public class EmitterRepository {
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    public Map<String, SseEmitter> findAllEmitterStartWithByUserId(String userId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(userId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public Map<String, Object> findAllEventCacheStartWithByMemberId(String userId) {
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(userId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public void deleteById(String id) {
        emitters.remove(id);
    }
}

emitters 에다가 여기에다가 save에서 받은 emitter넣어준다
발송해달라는 요청이들어오면 여기서 꺼내서 발송을 해준다 그러면 userConnection에 넣은 스레드다르고 꺼내는 스레드 다르고 발송하는 스레드가 다 다르다 -> 추후에 나올 onCompletion, onTimeout 콜백은 별도의 스레드에서 호출되서 SseEmitter가 담긴 컬렉션에 연산할거면 스레드 세이프해야 한다 그렇다면 스레드끼리 병합을 시켜줄 수 있는데 이런 스레드 세이프한 자료구조를 concurrent hash map이라고 한다

그럼 eventCache는 뭐냐면 유저가 창을 닫거나 로그아웃을 해서 연결이 해제되었는데 그 사이에 생긴 알림(누락된 알림)을 저장하는 곳이다

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
    // SSE 연결 지속 시간 설정
    private final EmitterRepository emitterRepository;
    private final NotificationRepository notificationRepository;
    private final UserRepository userRepository;
    @Value("${admin.id}")
    private String adminUserId;

    // [1] subscribe()
    public SseEmitter subscribe(Long userId, String lastEventId) {
        String emitterId = makeTimeIncludeId(userId);
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
        String eventId = makeTimeIncludeId(userId);
        sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + userId + "]");
        if (hasLostData(lastEventId)) {
            sendLostData(lastEventId, userId, emitterId, emitter);
        }
        return emitter;
    }

    @Transactional
    public void send(User receiver, Notification.NotificationType notificationType, String title, String content, String url) {
        Notification notification = notificationRepository.save(createNotification(receiver, notificationType, title, content, url));
        String receiverId = String.valueOf(receiver.getId());
        String eventId = receiverId + "_" + System.currentTimeMillis();
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByUserId(receiverId);
        emitters.forEach(
                (key, emitter) -> {
                    emitterRepository.saveEventCache(key, notification);
                    sendNotification(emitter, eventId, key, NotificationDto.from(notification));
                }
        );
    }

    private String makeTimeIncludeId(Long userId) { // (3)
        return userId + "_" + System.currentTimeMillis();
    }

    private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(eventId)
                    .name("sse")
                    .data(data)
            );
        } catch (IOException exception) {
            emitterRepository.deleteById(emitterId);
        }
    }


    private boolean hasLostData(String lastEventId) { // (5)
        return !lastEventId.isEmpty();
    }


    private void sendLostData(String lastEventId, Long userId, String emitterId, SseEmitter emitter) {
        Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(userId));
        eventCaches.entrySet().stream()
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
    }

    private Notification createNotification(User receiver, Notification.NotificationType notificationType, String title, String content, String url) {
        return Notification.builder()
                .receiver(receiver)
                .notificationType(notificationType)
                .title(title)
                .content(content)
                .url(url)
                .isRead(false)
                .build();
    }

    public List<NotificationDto> getNotificationDtos(Long userId) {
        return getNotificationsByUserId(userId).stream()
                .map(NotificationDto::from)
                .collect(Collectors.toList());
    }

    @Transactional
    public void setNotificationsAsRead(Long notificationId) {
        Notification notification = notificationRepository.findById(notificationId).orElseThrow(NotificationNotFoundException::new);
        notification.setIsReadTrue();
    }

    private List<Notification> getNotificationsByUserId(Long userId) {
        User user = userRepository.findById(userId).orElseThrow(UserNotFoundException::new);
        return notificationRepository.findNotificationsByUser(user);
    }

    private void validateAdminId(Long adminId) {
        if (adminId != adminUserId) {
            throw new AdminAuthorityException();
        }
    }

    private List<Notification> removeDuplicates(List<Notification> notifications) {
        return notifications.stream()
                .collect(Collectors.collectingAndThen(
                        Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(n -> n.getNotificationType().toString() + n.getUrl()))), ArrayList::new));
    }

}
  • subscribe()는 클라이언에게 받은 SSE연결 요청(구독)을 받는다 이전에 받지 못한 정보가 있으면 lastEventId로 받아올 수 있다 emmiterId는 userId + 시간으로 해서 emitter에 저장하고(한 회원이 여러개로 로그인했을때를 위해 시간까지 ex)컴터, 폰, 테블릿 또한 같은 브라우저에서 여러개 로그인도 대응 가능 sse는 HTTP/1.1에서 브라우저당 6개, HTTP/2에서 브라우저당 100개) onCompletion은 SseEmitter가 타임아웃이 일어났을 때 해당 emitter를 지워주고 completion은 SseEmitter가 완료되었을때도 해당 emitter를 지워준다 그리고 sendNotification으로 연결됬다는 알림을 하나 보내준다 이유는 처음에 SSE연결을 할 때 아무런 이벤트도 보내지 않으면 재연결 요청 혹은 연결 요청에 오류가 발생한다(503 Service Unavailable) 그래서 첫 SSE를 보낼때 더비 데이터를 넣어야 한다
    그리고 hasLostData로 lastEvent가 있는지 확인하고 있다면 이벤트 캐시를 가져와 lastEventId 와 비교하여 누락된 데이터를 전송

  • send()는 Notification 알림 엔티티 만들어서 저장하고 userId로 SseEmitter가져온다 그리고 eventCache에 혹여나 이 알림이 누락될 수 있으니 저장하고 sendNotification한다 forEach사용하는 이유는 아까 말햇듯이 한 회원이 여러곳에 로그인했을 때 전부 보내기 위해서

5. 문제점

배포환경에서 잘 안되는 문제점

Nginx는 기본적으로 Upstream으로 요청을 보낼때 HTTP 1.0 버전을 사용한다 SSE는 1.1이상이여야 하고 HTTP 1.1은 지속 연결이 기본이기 때문에 헤더를 따로 설정해줄 필요가 없다.
Nginx에서 백엔드 WAS로 요청을 보낼 때는 HTTP 1.0을 사용하고 Connection: close 헤더를 사용한다
SSE는 지속 연결이 되어 있어야 동작하는데 Nginx에서 지속 연결을 닫아버려 제대로 동작하지 않는다

proxy_set_header Connection '';
proxy_http_version 1.1;

또 Nginx의 proxy buffering 기능이 있다. SSE 통신에서 서버는 기본적으로 응답에 Transfer-Encoding: chunked 이다 SSE는 서버에서 동적으로 생성된 컨텐츠를 스트리밍하기 때문에 본문의 크기를 미리 알 수 없기 때문이다

Nginx는 서버의 응답을 버퍼에 저장해두었다가 버퍼가 차거나 서버가 응답 데이터를 모두 보내면 클라이언트로 전송한다 그러면 SSE를 사용하는 이유중 하나인 실시간성을 보장받지 못할 수 있다
SSE 응답을 반환하는 API의 헤더에 X-Accel-Buffering: no를 붙여주면 SSE 응답만 버퍼링을 하지 않도록 설정할 수 있다.

참고자료

0개의 댓글

관련 채용 정보