SSE로 알림 기능 구현하기 - feat.Spring boot

whitehighdragon·2023년 10월 25일
post-thumbnail

개발전에...

나는 알림기능 맡아서 구현하기로 했다.
일단 클라이언트와 서버 통신 방법부터 공부해야했다.

클라이언트 - 서버 통신 방법에는
폴링(Polling), 긴 폴링(Long Polling), 스트리밍(Streaming), 웹 소켓(WebSocket), SSE(Server-Sent-Events) 등이 있다
그중에 나는 SSE 방식으로 개발하기로 했다.

SSE(Server-Sent-Events) 방식

  • SSE는 서버와 한번 연결을 맺고 나면, 일정 시간 동안 서버에서 변경이 발생할 때마다 서버에서 클라이언트로 데이터를 전송하는 방법
    • 클라이언트는 서버를 구독한다.(SSE Connection을 맺는다.)
    • 서버는 변동사항이 생길 때마다 구독한 클라이언트들에게 데이터를 전송한다.
  • SSE는 서버에서 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며 HTTP의 persistent connections을 기반으로 하는 HTML5 표준 기술
  • 클라이언트가 서버와 크게 통신할 필요 없이 단지 업데이트된 데이터만 받아야 하는 실시간 데이터 스트림에 대한 구현이 필요하기 때문에 SSE로 개발을 하게 되었다.

1) SSE 연결

Controller

@Operation(summary = "sse세션연결")
@GetMapping(value = "/api/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(@Parameter(hidden = true) @AuthenticationPrincipal UserDetailsImpl userDetails,
                                            @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
    return ResponseEntity.ok(sseService.subscribe(userDetails.getMember(), lastEventId));
}
  • 서버에서는 SSE 통신을 위해서 EventSource로 요청을 해오면 처리할 Controller이다.
  • 연결 요청을 처리하기 위해서, MIME Type을 text/event-stream형태로 받아줘야한다.
  • Last-Event-ID는 헤더에 담겨져 오는 값으로 이전에 받지 못한 이벤트가 존재하는 경우(SSE연결에 대한 시간 만료 혹은 종료)나 받은 마지막 이벤트 ID 값을 넘겨 그 이후의 데이터(받지 못한 데이터)부터 받을 수 있게 할때 필요한 값이다.

Service

public SseEmitter subscribe(Member member, String lastEventId) {
    String emitterId = member.getUsername() + "_" + System.currentTimeMillis();

    SseEmitter sseEmitter = sseRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
    log.info("new emitter added : {}", sseEmitter);
    log.info("lastEventId : {}", lastEventId);

    /* 상황별 emitter 삭제 처리 */
    sseEmitter.onCompletion(() -> sseRepository.deleteEmitterById(emitterId)); //완료 시, 타임아웃 시, 에러 발생 시
    sseEmitter.onTimeout(() -> sseRepository.deleteEmitterById(emitterId));
    sseEmitter.onError((e) -> sseRepository.deleteEmitterById(emitterId));

    /* 503 Service Unavailable 방지용 dummy event 전송 */
    send(sseEmitter, emitterId, createDummyNotification(member.getUsername()));

    /* client가 미수신한 event 목록이 존재하는 경우 */
    if(!lastEventId.isEmpty()) { //client가 미수신한 event가 존재하는 경우 이를 전송하여 유실 예방
        Map<String, Object> eventCaches = sseRepository.findAllEventCacheStartsWithUsername(member.getUsername()); //id에 해당하는 eventCache 조회
        eventCaches.entrySet().stream() //미수신 상태인 event 목록 전송
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> emitEventToClient(sseEmitter, entry.getKey(), entry.getValue()));
    }

    return sseEmitter;
}

✅ emitterId

String emitterId = member.getUsername() + "_" + System.currentTimeMillis();
  • subscribe()를 보면 id값을 member.getUsername()_System.currentTimeMillis() 형태로 사용하는 것을 볼 수 있다. 이렇게 사용하는 이유가 Last-Event-ID 헤더와 상관이 있다.
  • Last-Event-ID헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미한다.
  • id값과 전송 데이터를 저장하고 있으면 이 값을 이용하여 유실된 데이터 전송을 다시 해줄 수 있다.
  • 만약 id값을 그대로 사용한다면 Last-Event-Id값이 의미가 없어진다.
  • 데이터의 id값을 member.getUsername()_System.currentTimeMillis() 형태로 두면 데이터가 유실된 시점을 파악할 수 있으므로 저장된 key값 비교를 통해 유실된 데이터만 재전송 할 수 있게 된다.
  • 그래서 id 값을 member.getUsername()_System.currentTimeMillis() 로 사용한다.

✅ SseEmitter

SseEmitter sseEmitter = sseRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
  • 클라이언트의 SSE연결 요청에 응답하기 위해서 SseEmitter 객체를 만들어 반환해줘야함.
  • SseEmitter 객체를 만들 때 유효 시간을 줄 수 있다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 된다.
  • id를 key로, SseEmitter를 value로 저장해둔다. 그리고 SseEmitter의 시간 초과 및 네트워크 오류를 포함한 모든 이유로 비동기 요청이 정상 동작할 수 없다면 저장해둔 SseEmitter를 삭제한다.

✅ 더미데이터 생성

/* 503 Service Unavailable 방지용 dummy event 전송 */
send(sseEmitter, emitterId, createDummyNotification(member.getUsername()));
  • 연결 요청에 의해 SseEmitter가 생성되면 더미 데이터를 보내줘야한다.
  • sse 연결이 이뤄진 후, 하나의 데이터도 전송되지 않는다면 SseEmitter의 유효 시간이 끝나면 503응답이 발생하는 문제가 있다. 따라서 연결시 바로 더미 데이터를 한 번 보내준다.

✅ Last-Event-ID값이 헤더에 있는 경우

/* client가 미수신한 event 목록이 존재하는 경우 */
if(!lastEventId.isEmpty()) { //client가 미수신한 event가 존재하는 경우 이를 전송하여 유실 예방
   Map<String, Object> eventCaches = sseRepository.findAllEventCacheStartsWithUsername(member.getUsername()); //id에 해당하는 eventCache 조회
   eventCaches.entrySet().stream() //미수신 상태인 event 목록 전송
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> emitEventToClient(sseEmitter, entry.getKey(), entry.getValue()));
}
  • Last-Event-ID값이 헤더에 있는 경우, 저장된 데이터 캐시에서 id 값과 Last-Event-ID값을 통해 유실된 데이터들만 다시 보내준다.

Repository

  • SseEmitter를 이용해 알림을 실제로 보내게 되는데 어떤 회원에게 어떤 Emitter가 연결되어있는지를 저정해줘야하고 어떤 이벤트들이 현재까지 발생했는지에 대해서도 저장하고 있어야한다.
  • 추후 Emitter의 연결이 끊기게 되도 저장되어 있는 Event를 기반으로 이를 전송해줄 수 있어야 되기 때문에 필요함.
  • 그래서 따로 SseRepository를 구현함.
public interface SseRepository {
    SseEmitter save(String emitterId, SseEmitter sseEmitter);

    void saveEventCache(String eventCacheId, Object event); 

    Map<String, SseEmitter> findAllEmitterStartsWithUsername(String username); 

    Map<String, Object> findAllEventCacheStartsWithUsername(String username); 

    void deleteEmitterById(String id); 

    void deleteAllEmitterStartsWithId(String id); 

    void deleteAllEventCacheStartsWithId(String id);
}

메서드 설명

  • save - Emitter를 저장한다.
  • saveEventCache - 이벤트를 저장한다.
  • findAllEmitterStartWithByUsername - 해당 회원과 관련된 모든 Emitter를 찾는다, 브라우저당 여러 개 연결이 가능하기에 여러 Emitter가 존재할 수 있다.
  • findAllEventCacheStartWithByUsername - 해당 회원과 관련된 모든 이벤트를 찾는다.
  • deleteEmitterById - Emitter를 지운다.
  • deleteAllEmitterStartsWithId - 해당 회원과 관련된 모든 Emitter를 지운다.
  • deleteAllEventCacheStartsWithId - 해당 회원과 관련된 모든 이벤트를 지운다.

2) SseService

//...

/**
 * [SSE 통신]specific user에게 알림 전송
 */
public void send(String receiver, String content, String type, String url) {
    Notification notification = createNotification(receiver, content, type, url);
    /* 로그인한 client의 sseEmitter 전체 호출 */
    Map<String, SseEmitter> sseEmitters = sseRepository.findAllEmitterStartsWithUsername(receiver);
    sseEmitters.forEach(
            (key, sseEmitter) -> {
                log.info("key, notification : {}, {}", key, notification);
                sseRepository.saveEventCache(key, notification); //저장
                emitEventToClient(sseEmitter, key, notification); //전송
            }
    );
}

/**
 * [SSE 통신]dummy data 생성
 * : 503 Service Unavailable 방지
 */
private Notification createDummyNotification(String receiver) {
    return Notification.builder()
            .notificationId(receiver + "_" + System.currentTimeMillis())
            .receiver(receiver)
            .content("send dummy data to client.")
            .notificationType(NotificationType.NOTICE.getAlias())
            .url(NotificationType.NOTICE.getPath())
            .readYn('N')
            .deletedYn('N')
            .build();
}

/**
 * [SSE 통신]notification type별 data 생성
 */
private Notification createNotification(String receiver, String content, String type, String url) {
    if(type.equals(NotificationType.COMMENT.getAlias())) { //댓글
        return Notification.builder()
                .receiver(receiver)
                .content(content)
                .notificationType(NotificationType.COMMENT.getAlias())
                .url(url)
                .readYn('N')
                .deletedYn('N')
                .build();
    } else if(type.equals(NotificationType.LIKEQUIZ.getAlias())) { //좋아요
        return Notification.builder()
                .receiver(receiver)
                .content(content)
                .notificationType(NotificationType.LIKEQUIZ.getAlias())
                .url(url)
                .readYn('N')
                .deletedYn('N')
                .build();
    } else if(type.equals(NotificationType.NOTICE.getAlias())) { //공지
        return Notification.builder()
                .receiver(receiver)
                .content(content)
                .notificationType(NotificationType.NOTICE.getAlias())
                .url(url)
                .readYn('N')
                .deletedYn('N')
                .build();
    } else {
        return null;
    }
}

/**
 * [SSE 통신]notification type별 event 전송
 */
private void send(SseEmitter sseEmitter, String emitterId, Object data) {
    try {
        sseEmitter.send(SseEmitter.event()
                .id(emitterId)
                .name("sse")
                .data(data, MediaType.APPLICATION_JSON));
    } catch(IOException exception) {
        sseRepository.deleteEmitterById(emitterId);
        sseEmitter.completeWithError(exception);
    }
}

/**
 * [SSE 통신]
 */
private void emitEventToClient(SseEmitter sseEmitter, String emitterId, Object data) {
    try {
        send(sseEmitter, emitterId, data);
        sseRepository.deleteEmitterById(emitterId);
    } catch (Exception e) {
        sseRepository.deleteEmitterById(emitterId);
        throw new RuntimeException("Connection Failed.");
    }
}

코드 설명

public void send(String receiver, String content, String type, String url)
  • 알림을 전송하기전 SseEmitter에 저장할때 쓰인다.
  • notification 객체를 만들어서 저장후 알림 전송 메서드를 실행한다.
private Notification createDummyNotification(String receiver)
  • 503 Service Unavailable방지용 dummy데이터를 생성하는 메서드
private Notification createNotification(String receiver, String content, String type, String url)
  • 타입별로 알림 데이터를 생성해주는 메서드
private void send(SseEmitter sseEmitter, String emitterId, Object data)
  • 실질적으로 알림을 전송하는 메서드이다.
private void emitEventToClient(SseEmitter sseEmitter, String emitterId, Object data)
  • 알림을 전송하는 메서드를 실행후 실행이 완료되면 전송완료된 내역은 sseEmitter에 있는 알림을 지운다.

트러블 슈팅

1. SSE 실행 시 Nginx 연결시간 시간재설정

  • 프론트엔드에서 SSE 연결을 하면 소스에서는 연결시간을 60분으로 설정했는데
  • Nginx에서는 1분으로 기본설정이 되어있어서 변경을 해줘야한다.
  1. 처음에는 전체적으로 timeout을 변경하려고 생각했다. ➡️ 그러면 전체적으로 timeout이 변경되어 비효율적이다.
location {
	proxy_pass http://spring-server;
    proxy_set_header Connection '';
    proxy_set_header Cache-Control 'no-cache';
    proxy_set_header X-Accel-Buffering 'no';
    proxy_set_header Content_Type 'text/event-stream';
    proxy_buffering off;
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
    proxy_read_timeout 86400s;
}
  1. sse연결부분만 설정하는 것으로 변경함.
location /api/subscribe {
	proxy_pass http://spring-server;
    proxy_set_header Connection '';
    proxy_set_header Cache-Control 'no-cache';
    proxy_set_header X-Accel-Buffering 'no';
    proxy_set_header Content_Type 'text/event-stream';
    proxy_buffering off;
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
    proxy_read_timeout 86400s;
}

2. JPA Bulk Update 처리

  • 알림 전체 읽음 기능을 구현하던 중, JPA는 특정 하나의 데이터만 변경감지를 통해 업데이트하고 있었다.
  • 검색해보니 JPA Bulk Update를 통해 한번에 업데이트하는 것으로 변경하였다.
  1. 처음에는 반복문을 통해 하나씩 업데이트를 해주는 것으로 생각했다.
  2. JPA Bulk Update를 적용하여 Update를 하는 것으로 수정하였다.
//Service
@Transactional
public void updateNotificationReadStatusByUsername(String username) {
    notificationRepository.bulkReadUpdate(username);
}
 
//Repository
@Modifying
@Query("update Notification set readYn = 'Y' where receiver = :username")
void bulkReadUpdate(@Param("username") String username);

3. SSE 연결시 한번만 연결되고 2번째부터는 연결안되는 이슈

  • 알림을 처음에 subscribe을 통해 연결 후 첫번째 알림은 정상적으로 되지만 2번째부터는 안되는 이슈 발견
  • sseRepository.deleteEmitterById(emitterId); 처음 알림이 가고 발송된 알림은 삭제되는 로직인데 이 코드를 삭제하니 정상적으로 알림이 도착하는 것을 확인
/**
 * [SSE 통신]
 */
private void emitEventToClient(SseEmitter sseEmitter, String emitterId, Object data) {
    try {
        send(sseEmitter, emitterId, data);
    } catch (Exception e) {
        sseRepository.deleteEmitterById(emitterId);
        throw new RuntimeException("Connection Failed.");
    }
}

0개의 댓글