저번 포스트에서 SSE가 제대로 연결되는 것을 확인했습니다.
이제는 실전으로 SSE를 사용합시다.
저의 목표는 단순합니다.
서버: 분석을 위한 데이터(소비패턴)을 보내면 이를 분석하고 SSE를 통해 분석 완료 시 완료 이벤트를 전송
클라이언트: 분석 완료 이벤트를 수신 시 알림을 통해 사용자에게 알려줌
이를 간단하게 그림으로 나타내면 다음과 같습니다.

const clickPostBtnHandler = ()=>{
const s= new EventSource(`${process.env.REACT_APP_API_URL}/ai/emitter`, { withCredentials: true }); // 1
setSse(true); // 2
s.addEventListener('data', (e)=>{ // 3
alert("분석이 끝났습니다."); // a
s.close(); // b
setSse(false); // c
})
// 분석을 요청하기 위해 데이터(입출금 정보)를 서버로 POST
}
data라는 이름으로 이벤트 수신 시 수행할 액션 정의public SseEmitter createEmitter(UUID uid) {
SseEmitter emitter = new SseEmitter(); // 1
emitter.onCompletion(() -> emitterRepository.deleteById(uid));
try {
emitter.send(SseEmitter.event() // 2
.id(String.valueOf(uid))
.data("연결"));
} catch (IOException exception) { // 3
emitterRepository.deleteById(uid);
emitter.completeWithError(exception);
}
return emitterRepository.save(uid, emitter); // 4
}
2에서 왜 더미 데이터를 보내는 지 궁금하다면 참고



정보를 입력하고 오른쪽 버튼을 클릭하면 로딩과 함께 몇 초 뒤 분석이 끝났다는 알림이 생성됩니다.
다만 문제가 있습니다.
별 무리없이 동작하지만 저번 포스트에서 말씀드렸다싶이 이 알림을 받기 위해선 그 페이지에서 가만히 기다리고 있어야합니다.
다른 페이지에 갔다오거나 새로고침을 해버리면 React에서 SSE 연결을 위한 객체가 초기화되어 정상적인 통신이 불가능합니다.
그래서 이런 부분을 수정해봅시다.
새로고침에도 연결을 유지할 수 있는 방법을 생각했고 결과는 다음과 같습니다.

결국 여기서 중점은 기존에 수행한 연결을 어떻게 다시 연결할 것이냐 입니다.
useEffect(async () => {
if (!sse) {
const eventSource= new EventSource(`${process.env.REACT_APP_API_URL}/ai/emitter/${sessionStorage.getItem("uid")}`, { withCredentials: true }); // 1
eventSource.addEventListener('data', (e)=>{ // 2
alert("분석이 끝났습니다.");
renewAnalysis();
eventSource.close();
setSse(false);
})
eventSource.onerror = (e) => {
eventSource.close();
};
}
}, []);
@GetMapping(value = "/api/ai/emitter/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter isExist(@PathVariable("id") UUID id) {
Optional<SseEmitter> e = aiService.getEmitter(id); // 1
if (e.isEmpty()) { // 2
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Resource not found");
} else{ // 3
SseEmitter emitter = e.get();
try {
emitter.send(SseEmitter.event()
.data("연결"));
} catch (IOException exception) {
emitter.completeWithError(exception);
}
return emitter;
}
}
public Optional<SseEmitter> getEmitter(UUID uid){
SseEmitter e = emitterRepository.getEmitterByUserId(uid);
if (e == null) {
return Optional.empty();
}
return Optional.ofNullable(e);
}
public Analysis analysis(AccessTokenPayload ap, AiAnalysisReqDto req){
System.out.println("분석 시작");
// 분석 동작
System.out.println("분석 끝");
this.sendEvent(ap.getId()); // 1
emitterRepository.deleteById(ap.getId()); // 2
return analysis;
}
public void sendEvent(UUID uid) {
SseEmitter emitter = emitterRepository.getEmitterByUserId(uid); // 1
if (emitter != null) {
try {
System.out.println("이벤트 발생");
emitter.send(SseEmitter.event()
.id(String.valueOf(uid))
.name("data")
.data("analysis success"));
} catch (IOException e) {
emitterRepository.deleteById(uid);
emitter.completeWithError(e);
}
} else{
System.out.println("Emitter 없음");
}
}
분명 Emitter가 있어야 하지만 Emitter가 없다고 뜹니다.

SSE 연결이 잘 되는 것도 확인했습니다.

즉 Emitter를 반환하고 이벤트를 발생시키는 사이에 Emitter를 제거하는 일이 발생했다는 것입니다.
저의 코드를 분석해본 결과 Emitter를 제거하는 코드는 두개 입니다.
public Analysis analysis(AccessTokenPayload ap, AiAnalysisReqDto req){
System.out.println("분석 시작");
// 분석 동작
System.out.println("분석 끝");
this.sendEvent(ap.getId());
emitterRepository.deleteById(ap.getId()); // 제거하는 코드 2
return analysis;
}
public SseEmitter createEmitter(UUID uid) {
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> emitterRepository.deleteById(uid)); // 제거하는 코드 2
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(uid))
.data("연결"));
} catch (IOException exception) {
emitterRepository.deleteById(uid);
emitter.completeWithError(exception);
}
return emitterRepository.save(uid, emitter);
}
제거 코드 1은 이벤트 발생 이전이기 때문에 용의선상에서 벗어납니다.
그래서 결국 원인은 emitter.onCompletion(() -> emitterRepository.deleteById(uid)); 이 부분입니다.
의미를 보면 결국 Complete되면 실행하라는 뜻입니다.
여러 블로그를 돌아다녀본 결과 emitter.complete()를 실행하면 onCompletion에 등록한 메서드가 동작한다는 것을 확인했습니다.
다만 저는 emitter.complete()를 실행한 부분이 어디에도 없었습니다.
그렇다면 다른 부분에서 종료되는 상황이 발생됐다는 것입니다.
그래서 더 자세히 찾아봤습니다.
스프링 공식문서를 찾아보면
이런 내용을 찾을 수 있습니다.

This method is called from a container thread when an async request completed for any reason including timeout and network error.
간단하게 해석하면 타임아웃, 네트워크 에러를 포함한 어떠한 이유에서든 완료가 됐다면 실행이 됩니다.
즉 비정상 종료(에러)에 의한 종료도 complete라고 보는 것이죠.
즉 제가 모르는 원인으로 인해 종료가 실행된 것이고 이에 따라 Emitter가 삭제된 것으로 보입니다.
그럼 이 부분을 주석처리하고 실행해보면
다음과 같은 에러가 발생합니다.
java.lang.IllegalStateException: ResponseBodyEmitter has already completed
사실 당연하죠. 위에서 그렇게 종료됐다고 했으니 말이죠.
그럼 종료 이유를 생각해봅시다.
이전 동작과 현재 동작을 비교하면 변수는 단 하나입니다. 새로고침
즉 새로고침을 하며 연결을 종료한 것입니다. 그리고 그것이 완료로 처리되어 Emitter를 삭제한 것이죠.
그래서 결론은 기존에 있던 Emitter를 반환받아 사용하는 것은 안 된다 라는 것입니다.
그래서 결국 새로운 SSE 연결을 하고 spring에서 있던 Emitter Repository에서 User id에 해당하는 Emitter를 업데이트해주는 방식으로 수정했습니다.

아래 코드는 페이지 첫 시작 시(새로고침 포함) 시 동작합니다.
useEffect(async () => {
try {
const eventSource = new EventSource(`${process.env.REACT_APP_API_URL}/ai/emitter/${sessionStorage.getItem("uid")}`, {withCredentials: true}); // 1
setSse(true); // 2
eventSource.addEventListener('data', (e) => { // 3
alert("분석이 끝났습니다.");
renewAnalysis();
eventSource.close();
setSse(false);
});
eventSource.onerror = (e) => { // 4
eventSource.close();
setSse(false);
};
} catch (e){ // 5
setSse(false);
}
}, []);
@GetMapping(value = "/api/ai/emitter/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter isExist(@PathVariable("id") UUID id, AccessTokenPayload ap) {
Optional<SseEmitter> e = aiService.getEmitter(id); // 1
if (e.isEmpty()) { // 2
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Resource not found");
} else{ // 3
return aiService.createEmitter(ap);
}
}

아주 하찮은(?) 결과이기도 합니다...
사실 없어도 이용에는 크게 문제는 없는 부분이긴 합니다만 이런 디테일한 부분 하나하나가 서비스의 완성도를 높인다고 생각하니 뿌듯하네요.