
SSE란 Server-Sent Events의 약자로 서버에서 클라이언트로 단방향으로 실시간 스트리밍을 하는 기술이다. 쉽게 말해서 소켓과 비슷하지만 오로지 서버에서 클라이언트로만 통신이 이루어진다는 특징이 있다.
회사에서 개발 중인 신규 프로젝트에서 비동기 처리와 SSE를 같이 다뤄야 하기에 이번에 필자도 처음 만져보게 되었다.
SSE는 다음과 같은 플로우로 작동되고 있었다.

실제로는 조금 더 복잡하지만 이 글에서는 설명을 위해서 간략하게 요약했다. 여기서 문제는 User측에서 SSE 기존 연결을 끊을 경우 발생했다.
첫 번째 연결에서는 SSE가 정상 작동했으나 두 번째에서는 연결만 되고 요청 결과가 SSE로 반환되지 않았다가 세 번째에서는 정상 작동하는 괴현상(?)이 발생했다.
구독할 때 무언가 문제가 있지는 않을까 하고 코드를 한 번 뜯어봤다.
public SseEmitter subscribe(String userId) {
SseEmitter old = emitters.remove(userId);
if (old != null) {
old.complete();
}
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitters.put(userId, emitter);
emitter.onCompletion(() -> emitters.remove(userId));
// ...
}
SSE가 처음이었던 나는 클로드에게 도움을 청했더니 다음과 같은 답변을 얻었다.
old.complete()의 onCompletion 콜백이 비동기로 실행되면서 새 emitter를 제거해버리는 race condition입니다.
이게 무슨 소리인가 싶어서 자세히 보니
if (old != null) {
old.complete();
}
새로 연결을 생성할 때 과거에 연결했던 Emitter를 삭제하는 콜백을 예약을 여기서 해놓은 다음
emitter.onCompletion(() -> emitters.remove(userId));
새로운 Emitter를 생성해 놓고 userId로 콜백을 실행해서 삭제를 하니 새로 만든 Emitter를 삭제해 버리게 되면서 연결이 제대로 안돼고 작동이 안됐던 것이다.

쉽게 말하자면 다음과 같은 플로우로 잘못 동작하고 있던 것이다
여기서 4번이 문제의 주 원인이었던 것이다. user ID를 기준으로 만들어진 a1과 a2 연결이기에 서버에서는 이를 구분하지 못하고 user ID로 기껏 만들어놓은 신규 연결을 삭제해 버린 것이다.
비유하자면 호텔에서 체크인 체크아웃을 할 때 해당 호실에 누가 머물고 있는지를 단순히 호실의 열쇠로만 구분하게 되니 안에 새로운 손님이 와도 "어? 체크아웃 해야 되네?" 하고 삭제하는 것이다.
해결 방법은 생각보다 간단했다. 클로드의 도움을 받아서 아래와 같은 코드로 수정을 했다.
public SseEmitter subscribe(String userId) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
SseEmitter old = emitters.put(userId, emitter);
if (old != null) {
old.complete();
}
emitter.onCompletion(() -> emitters.remove(userId, emitter));
emitter.onTimeout(() -> emitters.remove(userId, emitter));
emitter.onError(e -> emitters.remove(userId, emitter));
send(
userId,
SseEvent.builder()
.jobId(null)
.jobStatus(JobStatus.DONE)
.message("SSE subscribe success")
.at(LocalDateTime.now())
.build()
);
return emitter;
}
연결을 생성할 때 user ID만 하는 것이 아니라 Emitter도 같이 저장을 해서 현재 이 방에 누가 머물고 있는지도 같이 구분하게 하면서 a1과 a2 연결을 구분하여 a1만 잘 삭제할 수 있도록 했다.
코드를 비교해 보면 기존에
SseEmitter old = emitters.remove(userId);
이렇게 불러오던 거를
SseEmitter old = emitters.put(userId, emitter);
이런 식으로 변경하면서 user ID와 Emitter를 함께 구분할 수 있도록 했다. 그렇게 하면 이제 전체 코드는 아래와 같아진다.
@Slf4j
@Service
public class SseService {
private static final long DEFAULT_TIMEOUT = 60L * 60L * 1000L;
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
public SseEmitter subscribe(String userId) {
SseEmitter old = emitters.remove(userId);
if (old != null) {
old.complete();
}
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitters.put(userId, emitter);
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onTimeout(() -> emitters.remove(userId));
emitter.onError(e -> emitters.remove(userId));
send(
userId,
SseEvent.builder()
.jobId(null)
.jobStatus(JobStatus.DONE)
.message("SSE subscribe success")
.at(LocalDateTime.now())
.build()
);
return emitter;
}
public void send(String userId, SseEvent event) {
SseEmitter emitter = emitters.get(userId);
if (emitter == null) {
log.debug("No SSE connection for userId={}", userId);
return;
}
try {
emitter.send(SseEmitter.event()
.data(event));
} catch (IOException e) {
log.warn("SSE send failed. userId={}", userId, e);
emitters.remove(userId);
emitter.completeWithError(e);
}
}
}
새로운 기술을 사용할 때에는 제대로 공부를 하고 시작하자...
