MSA - 소모임 project 알람기능 - Server-Sent-Events

이동명·2023년 11월 1일
0

소모임 프로젝트

목록 보기
4/8
post-thumbnail

저번 포스팅에선 알림기능을 도입하며 멀티 소켓 서버의 부하테스트 명목으로 웹소켓을 사용하였다.

알림은 일방향 통신으로 SSE로 구현하면 더욱 간단하게 구현할 수 있고, 기능만 보면 이게 더 적합하다.

저번 포스팅에선 SSE에 대해 다루지 않았기 때문에 이번 포스팅에서 다뤄보겠다.

SSE(Server-Sent-Events)

SSE는 서버와 한번 연결을 맺고 나면, 일정 시간 동안 서버에서 변경이 발생할 때마다 서버에서 클라이언트로 데이터를 전송하는 방법입니다.

과정

  1. 클라이언트는 서버를 구독한다.(SSE Connection을 맺는다.)
  1. 서버는 변동사항이 생길 때마다 구독한 클라이언트들에게 데이터를 전송한다.

SSE는 상황에 따라서 응답마다 다시 요청을 해야 하는 Long Polling 방식보다 효율적입니다. SSE는 서버에서 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며 HTTP의 persistent connections을 기반으로 하는 HTML5 표준 기술입니다.

하지만 HTTP를 통한 SSE(HTTP/2가 아닐 경우)는 브라우저 당 6개의 연결로 제한되므로, 사용자가 웹 사이트의 여러 탭을 열면 첫 6개의 탭 이후에는 SSE가 작동하지 않는다는 단점도 있긴 합니다. (HTTP/2에서는 100개까지의 접속을 허용합니다.)

하지만 전반적으로 SSE는 클라이언트가 서버와 크게 통신할 필요 없이 단지 업데이트된 데이터만 받아야 하는 실시간 데이터 스트림에 대한 구현이 필요할 때는 매우 훌륭한 선택입니다.

간단한 사용법 및 코드첨부

NotificationController

@RestController
@RequestMapping("/notifications")
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;

    @GetMapping(value = "/subscribe/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@PathVariable Long id) {
        return notificationService.subscribe(id);
    }

    @PostMapping("/send-data/{id}")
    public void sendData(@PathVariable Long id) {
        notificationService.notify(id, "data");
    }
}

간단하게 클라이언트에서 구독을 하기 위한 subcribe 메서드와 임시로 서버에서 클라이언트로 알림을 주기 위한 sendData 메서드를 생성했습니다.

NotificationService

@Service
@RequiredArgsConstructor
public class NotificationService {
    // 기본 타임아웃 설정
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    private final EmitterRepository emitterRepository;

    /**
     * 클라이언트가 구독을 위해 호출하는 메서드.
     *
     * @param userId - 구독하는 클라이언트의 사용자 아이디.
     * @return SseEmitter - 서버에서 보낸 이벤트 Emitter
     */
    public SseEmitter subscribe(Long userId) {
        SseEmitter emitter = createEmitter(userId);

        sendToClient(userId, "EventStream Created. [userId=" + userId + "]");
        return emitter;
    }

    /**
     * 서버의 이벤트를 클라이언트에게 보내는 메서드
     * 다른 서비스 로직에서 이 메서드를 사용해 데이터를 Object event에 넣고 전송하면 된다.
     *
     * @param userId - 메세지를 전송할 사용자의 아이디.
     * @param event  - 전송할 이벤트 객체.
     */
    public void notify(Long userId, Object event) {
        sendToClient(userId, event);
    }

    /**
     * 클라이언트에게 데이터를 전송
     *
     * @param id   - 데이터를 받을 사용자의 아이디.
     * @param data - 전송할 데이터.
     */
    private void sendToClient(Long id, Object data) {
        SseEmitter emitter = emitterRepository.get(id);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event().id(String.valueOf(id)).name("sse").data(data));
            } catch (IOException exception) {
                emitterRepository.deleteById(id);
                emitter.completeWithError(exception);
            }
        }
    }

    /**
     * 사용자 아이디를 기반으로 이벤트 Emitter를 생성
     *
     * @param id - 사용자 아이디.
     * @return SseEmitter - 생성된 이벤트 Emitter.
     */
    private SseEmitter createEmitter(Long id) {
        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        emitterRepository.save(id, emitter);

        // Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
        emitter.onCompletion(() -> emitterRepository.deleteById(id));
        // Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
        emitter.onTimeout(() -> emitterRepository.deleteById(id));

        return emitter;
    }
}

클라이언트에서 처음 구독 시, 즉 subscribe 메서드에서 처음 구독 시에 sendToClient()를 통해 데이터를 전송하는 이유는 처음 SSE 응답을 할 때 아무런 이벤트도 보내지 않으면 재연결 요청을 보내거나, 연결 요청 자체에서 오류가 발생하기 때문입니다. 따라서 첫 SSE 응답을 보낼 시 더미 데이터를 넣어 이러한 오류를 방지하기 위해 전송합니다.

EmitterRepository

@Repository
@RequiredArgsConstructor
public class EmitterRepository {
    // 모든 Emitters를 저장하는 ConcurrentHashMap
    private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();

    /**
     * 주어진 아이디와 이미터를 저장
     *
     * @param id      - 사용자 아이디.
     * @param emitter - 이벤트 Emitter.
     */
    public void save(Long id, SseEmitter emitter) {
        emitters.put(id, emitter);
    }

    /**
     * 주어진 아이디의 Emitter를 제거
     *
     * @param id - 사용자 아이디.
     */
    public void deleteById(Long id) {
        emitters.remove(id);
    }

    /**
     * 주어진 아이디의 Emitter를 가져옴.
     *
     * @param id - 사용자 아이디.
     * @return SseEmitter - 이벤트 Emitter.
     */
    public SseEmitter get(Long id) {
        return emitters.get(id);
    }
}

모든 Emitter들을 저장하는 emitters객체를 thread-safe 한 ConturrentHashMap을 통해 선언했습니다.

CORS 임시해제

@Configuration
public class WebConfig implements WebMvcConfigurer {
    public static final String ALLOWED_METHOD_NAMES = "GET,HEAD,POST,PUT,DELETE,TRACE,OPTIONS,PATCH";

    @Override
    public void addCorsMappings(final CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedMethods(ALLOWED_METHOD_NAMES.split(","));
    }
}

테스트

먼저 GET "/subscribe/{id}"를 통해 구독을 시도하면 다음과 같이 출력됩니다.

그리고 POST "/send-data/{id}"를 통해 서버에서 임시로 이벤트를 발행시킵니다.

데이터를 받는 모습

클라이언트 코드

const eventSource = new EventSource('http://localhost:8080/notifications/subscribe/1');

eventSource.addEventListener('sse', event => {
    console.log(event);
});

추가 tip

추가로, SSE 서비스단에 트랜잭션이 걸려있다면 SSE연결 동안 트랜잭션을 계속 물고 있어 커넥션 낭비가 일어날 수 있으니 SSE 서비스로직에는 트랜잭션을 걸지 않아야 합니다.


참고

profile
Web Developer

0개의 댓글