[Spring] SSE를 통한 실시간 알림 기능 구현

최혜원·2023년 8월 31일
0

Spring

목록 보기
18/19
post-thumbnail

SSE 구조와 구현

Client 하나당 sseemitter하나

SSE의 기본적인 흐름은 클라이언트가 SSE요청을 보내면 서버에서는 클라이언트와 매핑되는 SSE 통신객체를 만든다(SseEmitter) 해당객체가 이벤트 발생시 eventsource를 client에게 전송하면서 데이터가 전달되는 방식이다. sseemitter는 SSE 통신을 지원하는 스프링에서 지원하는 API이다.
3034247E-EB82-4F64-8B80-7C4FB1FEEAB6

맨처음 클라이언트에서 SSE 요청이 오면 서버는 위 그림과 같이 기본적인 응답해더값과 더불어 필요한 헤더들을 반환해야한다. (초록 화살표)

@GetMapping(value = "/connect", produces = "text/event-stream")
    @Operation(summary = "SSE 연결")
    @ResponseStatus(HttpStatus.OK)
    public ResponseEntity<SseEmitter> subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails,
        // @RequestHeader를 이용하여 header를 받아 데이터를 꺼내서 사용
	@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
        return ResponseEntity.ok(notificationService.subscribe(userDetails.getUser(), lastEventId));

NotificationController

lastEventId를 파라미터로 받는 이유는 로그인 정보를 기반으로 미수신 event 유실을 예방하기 위해서이다.
→ 뒤에서 자세히 설명!

NotificationServiceImpl

클라이언트로부터 SSE 연결 요청을 받아서, user정보와 HttpServletResponse값을 토대로 subscribe 메서드를 작성한다.

클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해줘야한다. SseEmitter 객체를 만들 때 유효 시간을 줄 수 있다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 된다. ( 위에서 설명했던 : 이를 방지하기 위한 것이 Last-Event-ID 헤더이다. 이 헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미한다. 이를 이용하여 유실된 데이터를 다시 보내줄 수 있다. )

id를 key로, SseEmitter를 value로 저장해둔다. 그리고 SseEmitter의 시간 초과 및 네트워크 오류를 포함한 모든 이유로 비동기 요청이 정상 동작할 수 없다면 저장해둔 SseEmitter를 삭제한다.

     // subscribe 로 연결 요청 시 SseEmitter(발신기)를 생성합니다.
    public SseEmitter subscribe(User user, String lastEventId) {
        String emitterId = makeTimeIncludeId(user);
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));

        // SseEmitter 의 완료/시간초과/에러로 인한 전송 불가 시 sseEmitter 삭제
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

        // 클라이언트는 SSE Timeout 될 경우 자동으로 재연결 시도
        // 재연결 시 한 번도 데이터를 전송한 적이 없다면 503 에러가 발생하므로 최초 연결 시 더미 이벤트를 전송
        String eventId = makeTimeIncludeId(user);
        sendToClient(emitter, emitterId, eventId,
            "연결되었습니다. EventStream Created. [userId=" + user.getId() + "]");

        // 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
        // 클라이언트의 요청 헤더에 lastEventId 값이 있는 경우 lastEventId 보다 더 큰(더 나중에 생성된) emitter를 찾아서 발송
        if (!lastEventId.isEmpty()) { // Last-Event-ID가 존재한다는 것은 받지 못한 데이터가 있다는 것이다. (프론트에서 알아서 보내준다.)
            Map<String, Object> events = emitterRepository.findAllEventCacheStartWithUserId(
                String.valueOf(user.getId()));
            events.entrySet().stream()
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getKey(),
                    entry.getValue()));
        }
        return emitter;
    }

각각 메소드를 살펴보자면...

makeTimeIncludeId()

private String makeTimeIncludeId(User user) {  // 데이터 유실 시점 파악 위함
        return user.getId() + "_" + System.currentTimeMillis();
    }

데이터의 id값을 ${userId}_${System.currentTimeMillis()} 형태로 두면 데이터가 유실된 시점을 파악할 수 있으므로 저장된 key값 비교를 통해 유실된 데이터만 재전송 할 수 있게 된다.

Last-Event-Id = 3

{3, data1}
{3, data3}
{3, data2}

=> 어떤 데이터까지 제대로 전송되었는지 알 수 없다.
Last-Event-Id = 3_1631593143664

{3_1631593143664, data1}
{3_1831593143664, data3}
{3_1731593143664, data2}

=> data1 까지 제대로 전송되었고, data2, data3을 다시 보내야한다.

다음과 같이 3이라는 ID를 가진 회원의 이벤트 중 뒤의 시간을 기준으로 구분할 수 있게 된다.

sendToClient()

     // 특정 SseEmitter 를 이용해 알림을 보냅니다. SseEmitter 는 최초 연결 시 생성되며,
    // 해당 SseEmitter 를 생성한 클라이언트로 알림을 발송하게 됩니다.
    public void sendToClient(SseEmitter emitter, String emitterId, String eventId, Object data) {
        try {
            emitter.send(SseEmitter.event()
                .name("sse")
                .id(eventId)
                .data(data));
        } catch (IOException exception) {
            emitterRepository.deleteById(emitterId);
            throw new BusinessException(ErrorCode.SSE_CONNECTION_ERROR);
        }
    }

send()

@Override
    public void send(NotificationRequestDto requestDto) {
        sendNotification(requestDto, saveNotification(requestDto));
    }
// 알람 저장
    private Notification saveNotification(NotificationRequestDto requestDto) {
        Notification notification = Notification.builder()
            .receiver(requestDto.getReceiver())
            .notificationType(requestDto.getNotificationType())
            .content(requestDto.getContent())
            .url(requestDto.getUrl())
            .isRead(false)
            .build();
        notificationRepository.save(notification);
        return notification;
    }

notification 테이블에 저장됨
E83837EA-B072-4B3E-9BEE-C93421030ACF

// 알림 보내기
    @Async
    public void sendNotification(NotificationRequestDto request, Notification notification) {
        String receiverId = String.valueOf(request.getReceiver().getId());
        String eventId = receiverId + "_" + System.currentTimeMillis();
        // 유저의 모든 SseEmitter 가져옴
        Map<String, SseEmitter> emitters = emitterRepository
            .findAllEmitterStartWithByUserId(receiverId);
        emitters.forEach(
            (key, emitter) -> {
                // 데이터 캐시 저장 (유실된 데이터 처리 위함)
                emitterRepository.saveEventCache(key, notification);
                // 데이터 전송
                sendToClient(emitter, key, eventId, NotificationResponseDto.of(notification));
            }
        );
    }

EmitterRepositoryImpl - 메서드 설명

save - Emitter를 저장한다.
saveEventCache - 이벤트를 저장한다.
findAllEmitterStartWithByMemberId - 해당 회원과 관련된 모든 Emitter를 찾는다.
◦ 브라우저당 여러 개 연결이 가능하기에 여러 Emitter가 존재할 수 있다.
findAllEventCacheStartWithByMemberId - 해당 회원과 관련된 모든 이벤트를 찾는다.
deleteById - Emitter를 지운다.
deleteAllEmitterStartWithId - 해당 회원과 관련된 모든 Emitter를 지운다.
deleteAllEventCacheStartWithId - 해당 회원과 관련된 모든 이벤트를 지운다.

적용하기

PostServiceImpl - createPostLike()

private final NotificationService notificationService;

notificationService.notifyToUsersThatTheyHaveReceivedLike(postLike); // 게시글 좋아요 알람 추가

PostCommentServiceImpl - createPostComment()

private final NotificationService notificationService;

notificationService.notifyToUsersThatTheyHaveReceivedComment(postComment); // 게시글 댓글 알람 추가

Postman 으로 테스트

  1. 게시글 작성자 아이디로 SSE 연결
    6FF34C64-4B11-4128-A537-97AEE99EDC72

  2. 작성자가 쓴 게시글에 좋아요 보내기
    0937CBF9-86DB-4E60-AED6-38A0B552532A

  3. DB저장 + 알람 확인
    8AE3D246-9682-4038-88B4-E54CC1D0AEC5
    1A7C4081-CC27-4155-A9B1-9079323858D4
    BB41B9B7-18FA-4CA2-8C19-7BFC6DCB28E4

+ 비동기에 대한 설정

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer { // 비동기 설정

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(3);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(50);
        taskExecutor.setThreadNamePrefix("async-thread-");
        taskExecutor.initialize();
        return taskExecutor;
    }
}

+ Spring Async 처리

  • sync 란 호출 후 응답을 기다리는거고, async 는 호출 후 응답을 기다리지 않는 것입니다.
    • 이러한 특징 떄문에 Async 의 경우 오래 걸리는 작업을 호출한 후, 응답을 즉시 반환할 수 있습니다.
  • Spring 에서 @Async annotation 을 설정해두면 호출하는 스레드는 즉시 리턴하고, Spring 스레드 풀에서에서 Thread 처리를 수행합니다.
  • @Async 라고 선언된 annotation 이 spring aop 에 의해서 감지되서 수행 됩니다.
profile
어제보다 나은 오늘

0개의 댓글