Spring - React SSE 적용(새로고침 해도 알림 받기)

VVOO·2024년 10월 5일

배경

이전 글 참고

저번 포스트에서 SSE가 제대로 연결되는 것을 확인했습니다.

이제는 실전으로 SSE를 사용합시다.

저의 목표는 단순합니다.

서버: 분석을 위한 데이터(소비패턴)을 보내면 이를 분석하고 SSE를 통해 분석 완료 시 완료 이벤트를 전송

클라이언트: 분석 완료 이벤트를 수신 시 알림을 통해 사용자에게 알려줌

이를 간단하게 그림으로 나타내면 다음과 같습니다.

  1. 클라이언트에서 SSE 연결을 통해 알림을 받을 수 있는 준비를 합니다.
  2. 연결이 된 후 데이터를 POST해 분석을 요청합니다.
  3. 분석 완료 시 1번에서 수행한 SSE를 통해 분석 완료 이벤트를 발생시킵니다.
  4. 클라이언트에서 분석 완료 이벤트 수신 시 사용자에게 알림을 보낸다.
  5. 분석 완료된 데이터를 GET하고 이를 LIST component를 통해 가시화한다.

구현

알림 받아 오기

React

const clickPostBtnHandler = ()=>{
        const s= new EventSource(`${process.env.REACT_APP_API_URL}/ai/emitter`, { withCredentials: true }); // 1
		setSse(true); // 2
		s.addEventListener('data', (e)=>{ // 3
            alert("분석이 끝났습니다."); // a
            s.close(); // b
            setSse(false); // c
        })

		// 분석을 요청하기 위해 데이터(입출금 정보)를 서버로 POST

    }
  1. 우선 분석 요청 POST를 하기 전 SSE 연결을 한다.
  2. useState를 통해 연결 시 true를 설정함으로써 연결 중임을 표시한다.
  3. data라는 이름으로 이벤트 수신 시 수행할 액션 정의
    a. 분석 종료 이벤트 수신 시 사용자에게 알림
    b. 원하는 알림을 받아 SSE 연결 종료
    c. 연결 종료가 됐기에 2에서 설정한 sse를 false로 지정한다.

spring

public SseEmitter createEmitter(UUID uid) {
        
	SseEmitter emitter = new SseEmitter(); // 1
    emitter.onCompletion(() -> emitterRepository.deleteById(uid));
	try {
		emitter.send(SseEmitter.event() // 2
			.id(String.valueOf(uid))
			.data("연결"));
	} catch (IOException exception) { // 3
		emitterRepository.deleteById(uid);
		emitter.completeWithError(exception);
	}
	return emitterRepository.save(uid, emitter); // 4
}
  1. SSE 연결을 위한 Emitter 생성
  2. SSE 첫 연결을 위해 Dummy data 전송
  3. 에러 발생 시 저장한 Emitter를 삭제하고 연결을 종료
  4. 모든 사용자가 아닌 특정 사용자에게만 알림을 보내기에 User id와 emitter를 같이 저장

2에서 왜 더미 데이터를 보내는 지 궁금하다면 참고

결과



정보를 입력하고 오른쪽 버튼을 클릭하면 로딩과 함께 몇 초 뒤 분석이 끝났다는 알림이 생성됩니다.

문제

다만 문제가 있습니다.

별 무리없이 동작하지만 저번 포스트에서 말씀드렸다싶이 이 알림을 받기 위해선 그 페이지에서 가만히 기다리고 있어야합니다.

다른 페이지에 갔다오거나 새로고침을 해버리면 React에서 SSE 연결을 위한 객체가 초기화되어 정상적인 통신이 불가능합니다.

그래서 이런 부분을 수정해봅시다.

연결 유지

새로고침에도 연결을 유지할 수 있는 방법을 생각했고 결과는 다음과 같습니다.

  1. 클라이언트에서 SSE 연결을 통해 알림을 받을 수 있는 준비를 합니다.
  2. 연결이 된 후 데이터를 POST해 분석을 요청합니다.
  3. 새로고침 혹은 다른 페이지 이동 후 다시 돌아옵니다.
  4. 해당 user와 연결에 사용한 Emitter를 요청합니다.
  5. User Id와 함께 저장되어 있던 Emitter를 반환합니다.
  6. 분석 완료 시 1번에서 수행한 SSE를 통해 분석 완료 이벤트를 발생시킵니다.
  7. 클라이언트에서 분석 완료 이벤트 수신 시 사용자에게 알림을 보냅니다.
  8. 분석 완료된 데이터를 GET하고 이를 LIST component를 통해 가시화합니다.

결국 여기서 중점은 기존에 수행한 연결을 어떻게 다시 연결할 것이냐 입니다.

React

useEffect(async () => {
        if (!sse) {
            const eventSource= new EventSource(`${process.env.REACT_APP_API_URL}/ai/emitter/${sessionStorage.getItem("uid")}`, { withCredentials: true }); // 1

			eventSource.addEventListener('data', (e)=>{ // 2
                alert("분석이 끝났습니다.");
                renewAnalysis();
                eventSource.close();
                setSse(false);
            })
            eventSource.onerror = (e) => {
                eventSource.close();
            };
        }

    }, []);
  1. 기존 User Id에 해당하는 SSE 연결이 있는지 확인
  2. 있다면 위와 동일한 동작 설정

Spring

연결

@GetMapping(value = "/api/ai/emitter/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter isExist(@PathVariable("id") UUID id) {
        Optional<SseEmitter> e = aiService.getEmitter(id); // 1
        if (e.isEmpty()) { // 2
            throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Resource not found");
        } else{ // 3
            SseEmitter emitter = e.get();
            try {
                emitter.send(SseEmitter.event()
                        .data("연결"));
            } catch (IOException exception) {
                emitter.completeWithError(exception);
            }
            return emitter;
        }

    }
public Optional<SseEmitter> getEmitter(UUID uid){
        SseEmitter e = emitterRepository.getEmitterByUserId(uid);
        if (e == null) {
            return Optional.empty();
        }
        return Optional.ofNullable(e);
    }
  1. Emitter Repository에서 User id와 함께 저장된 Emitter를 반환한다.
  2. 없다면 Not fount error 발생
  3. 있다면 emitter를 반환하여 SSE 연결을 수행

이벤트 발생

public Analysis analysis(AccessTokenPayload ap, AiAnalysisReqDto req){
        System.out.println("분석 시작");
        
		// 분석 동작
        
		System.out.println("분석 끝");
        this.sendEvent(ap.getId()); // 1
        emitterRepository.deleteById(ap.getId()); // 2
        return analysis;
    }
  1. 분석이 끝났다면 이벤트 발생
  2. 이벤트 발생 후 더 이상 필요없는 SSE 연결은 제거
public void sendEvent(UUID uid) {
        SseEmitter emitter = emitterRepository.getEmitterByUserId(uid); // 1
        if (emitter != null) {
            try {
                System.out.println("이벤트 발생");
                emitter.send(SseEmitter.event()
                        .id(String.valueOf(uid))
                        .name("data")
                        .data("analysis success"));
            } catch (IOException e) {
                emitterRepository.deleteById(uid);
                emitter.completeWithError(e);
            }
        } else{
            System.out.println("Emitter 없음");
        }
    }

문제

분명 Emitter가 있어야 하지만 Emitter가 없다고 뜹니다.

SSE 연결이 잘 되는 것도 확인했습니다.

즉 Emitter를 반환하고 이벤트를 발생시키는 사이에 Emitter를 제거하는 일이 발생했다는 것입니다.

저의 코드를 분석해본 결과 Emitter를 제거하는 코드는 두개 입니다.

public Analysis analysis(AccessTokenPayload ap, AiAnalysisReqDto req){
        System.out.println("분석 시작");
        
		// 분석 동작
        
		System.out.println("분석 끝");
        this.sendEvent(ap.getId()); 
        emitterRepository.deleteById(ap.getId()); // 제거하는 코드 2
        return analysis;
    }
public SseEmitter createEmitter(UUID uid) {
        
	SseEmitter emitter = new SseEmitter(); 
    emitter.onCompletion(() -> emitterRepository.deleteById(uid)); // 제거하는 코드 2
	try {
		emitter.send(SseEmitter.event() 
			.id(String.valueOf(uid))
			.data("연결"));
	} catch (IOException exception) { 
		emitterRepository.deleteById(uid);
		emitter.completeWithError(exception);
	}
	return emitterRepository.save(uid, emitter);
}

제거 코드 1은 이벤트 발생 이전이기 때문에 용의선상에서 벗어납니다.

그래서 결국 원인은 emitter.onCompletion(() -> emitterRepository.deleteById(uid)); 이 부분입니다.

의미를 보면 결국 Complete되면 실행하라는 뜻입니다.

여러 블로그를 돌아다녀본 결과 emitter.complete()를 실행하면 onCompletion에 등록한 메서드가 동작한다는 것을 확인했습니다.

다만 저는 emitter.complete()를 실행한 부분이 어디에도 없었습니다.

그렇다면 다른 부분에서 종료되는 상황이 발생됐다는 것입니다.

그래서 더 자세히 찾아봤습니다.

스프링 공식문서를 찾아보면

https://docs.spring.io/spring-framework/docs/4.2.2.RELEASE_to_4.2.3.RELEASE/Spring%20Framework%204.2.3.RELEASE/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.html

이런 내용을 찾을 수 있습니다.

This method is called from a container thread when an async request completed for any reason including timeout and network error.

간단하게 해석하면 타임아웃, 네트워크 에러를 포함한 어떠한 이유에서든 완료가 됐다면 실행이 됩니다.

즉 비정상 종료(에러)에 의한 종료도 complete라고 보는 것이죠.

즉 제가 모르는 원인으로 인해 종료가 실행된 것이고 이에 따라 Emitter가 삭제된 것으로 보입니다.

그럼 이 부분을 주석처리하고 실행해보면

다음과 같은 에러가 발생합니다.

java.lang.IllegalStateException: ResponseBodyEmitter has already completed

사실 당연하죠. 위에서 그렇게 종료됐다고 했으니 말이죠.

그럼 종료 이유를 생각해봅시다.

이전 동작과 현재 동작을 비교하면 변수는 단 하나입니다. 새로고침

즉 새로고침을 하며 연결을 종료한 것입니다. 그리고 그것이 완료로 처리되어 Emitter를 삭제한 것이죠.

그래서 결론은 기존에 있던 Emitter를 반환받아 사용하는 것은 안 된다 라는 것입니다.

해결

그래서 결국 새로운 SSE 연결을 하고 spring에서 있던 Emitter Repository에서 User id에 해당하는 Emitter를 업데이트해주는 방식으로 수정했습니다.

  1. 클라이언트에서 SSE 연결을 통해 알림을 받을 수 있는 준비를 합니다.
  2. 연결이 된 후 데이터를 POST해 분석을 요청합니다.
  3. 새로고침 혹은 다른 페이지 이동 후 다시 돌아옵니다.
  4. 새로운 SSE를 반환합니다. (Emitter Rository를 확인해 데이터가 있으면 반환, 없다면 반환 x)
  5. 분석 완료 시 1번에서 수행한 SSE를 통해 분석 완료 이벤트를 발생시킵니다.
  6. 클라이언트에서 분석 완료 이벤트 수신 시 사용자에게 알림을 보냅니다.
  7. 분석 완료된 데이터를 GET하고 이를 LIST component를 통해 가시화합니다.

react

아래 코드는 페이지 첫 시작 시(새로고침 포함) 시 동작합니다.

useEffect(async () => {
        try {
            const eventSource = new EventSource(`${process.env.REACT_APP_API_URL}/ai/emitter/${sessionStorage.getItem("uid")}`, {withCredentials: true}); // 1
            setSse(true); // 2
            eventSource.addEventListener('data', (e) => { // 3
                alert("분석이 끝났습니다.");
                renewAnalysis();
                eventSource.close();
                setSse(false);
            });
            eventSource.onerror = (e) => { // 4
                eventSource.close();
                setSse(false);
            };
        } catch (e){ // 5
            setSse(false);
        }
    }, []);
  1. UID에 해당하는 SSE연결이 있었는지(Emitter가 있는지) 확인
  2. 문제없이 반환받았다면 useState로 연결 중 설정
  3. 이벤트 수신 시 동작 정의(수신 시 마지막엔 연결 종료)
  4. 이벤트 수신 오류 발생 시 SSE 연결 종료
  5. 마찬가지로 오류 발생 시 SSE 연결 종료

Spring

@GetMapping(value = "/api/ai/emitter/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter isExist(@PathVariable("id") UUID id, AccessTokenPayload ap) {
        Optional<SseEmitter> e = aiService.getEmitter(id); // 1
        if (e.isEmpty()) { // 2
            throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Resource not found");
        } else{ // 3
            return aiService.createEmitter(ap);
        }
    }
  1. User id에 해당하는 Emitter를 가져온다.
  2. 없으면 에러 반환
  3. 있으면 새로운 SSE 연결 수행

결과

아주 하찮은(?) 결과이기도 합니다...

사실 없어도 이용에는 크게 문제는 없는 부분이긴 합니다만 이런 디테일한 부분 하나하나가 서비스의 완성도를 높인다고 생각하니 뿌듯하네요.

profile
Ctrl C V 뽑고 작성합니다.(그러고 싶습니다.)

0개의 댓글