GraphQL Subscription은 내부적으로 Flux 또는 Publisher를 통해 데이터를 스트리밍합니다.
예를 들어, 특정 clientId를 기준으로 실시간 데이터를 구독하는 코드를 작성하면 다음과 같은 상황이 발생할 수 있습니다.
clientId로 여러 번 Subscription 요청이 들어올 수 있다. 이 문제를 해결하려면 기존 세션을 안전하게 종료하고 새로 시작하는 관리 로직이 필요합니다.
아래는 클라이언트별 단일 구독 세션을 유지하는 예시 코드입니다.
private final Map<String, FluxSink<ClientResponse>> clientSinks = new ConcurrentHashMap<>();
public Flux<ClientResponse> subscribeToClientData(String clientId) {
log.info("[subscribeToClientData] clientId :: {}, active sinks :: {}", clientId, clientSinks.size());
// 기존 세션이 존재하면 종료 처리
if (clientSinks.containsKey(clientId)) {
clientSinks.get(clientId).complete();
clientSinks.remove(clientId);
log.info("Existing sink for clientId: {} was completed and removed.", clientId);
}
return Flux.create(sink -> {
clientSinks.put(clientId, sink);
// 구독 취소(Cancel) 시 세션 정리
sink.onCancel(() -> {
log.info("onCancel sink for clientId :: {}, active sinks :: {}", clientId, clientSinks.size());
sink.complete();
clientSinks.remove(clientId);
});
publishClientData(clientId); // 초기 데이터 전송
});
}
ConcurrentHashMap을 이용한 세션 추적각 클라이언트의 구독 세션을 Map<clientId, FluxSink> 형태로 관리합니다.
이는 Thread-safe하므로 단일 서버 환경에서 안정적으로 동작합니다.
private final Map<String, FluxSink<ClientResponse>> clientSinks = new ConcurrentHashMap<>();
새로운 구독이 요청되면, 동일한 clientId로 이미 존재하는 Sink가 있는지 확인하고
존재한다면 즉시 complete() 호출 후 제거합니다.
이 과정을 통해 클라이언트당 항상 단일 세션만 존재하게 됩니다.
if (clientSinks.containsKey(clientId)) {
clientSinks.get(clientId).complete();
clientSinks.remove(clientId);
}
클라이언트가 Subscription을 취소하거나 연결이 끊기면 onCancel 이벤트가 발생합니다.
이 시점에 FluxSink를 complete() 시키고 Map에서 제거하여 메모리 누수를 방지합니다.
sink.onCancel(() -> {
sink.complete();
clientSinks.remove(clientId);
});
서버에서 특정 시점에 데이터를 전송하고 싶다면 다음과 같이 구현할 수 있습니다.
public void publishClientData(String clientId) {
FluxSink<ClientResponse> sink = clientSinks.get(clientId);
if (sink != null) {
sink.next(new ClientResponse("Updated data for " + clientId));
}
}
이 방식으로 서버는 특정 클라이언트에게 실시간 데이터를 안전하게 푸시할 수 있습니다.
ConcurrentHashMap은 단일 인스턴스 환경에 한정된 해결책입니다.
서버가 여러 대로 늘어나면 다른 접근 방식이 필요합니다.
(ex: Kafka, Redis...)