Spring Boot - GraphQL subscription 클라이언트별 단일 세션 관리

조제·2025년 10월 10일
0

1. 문제 상황

GraphQL Subscription은 내부적으로 Flux 또는 Publisher를 통해 데이터를 스트리밍합니다.
예를 들어, 특정 clientId를 기준으로 실시간 데이터를 구독하는 코드를 작성하면 다음과 같은 상황이 발생할 수 있습니다.

  • 동일한 clientId로 여러 번 Subscription 요청이 들어올 수 있다.
  • 이전 연결이 끊기지 않은 상태에서 새로운 구독이 생성된다.
  • 결과적으로 같은 데이터를 중복으로 전송하거나, 리소스 누수가 발생할 수 있다.

이 문제를 해결하려면 기존 세션을 안전하게 종료하고 새로 시작하는 관리 로직이 필요합니다.


2. Flux Sink 기반 세션 관리

아래는 클라이언트별 단일 구독 세션을 유지하는 예시 코드입니다.

private final Map<String, FluxSink<ClientResponse>> clientSinks = new ConcurrentHashMap<>();

public Flux<ClientResponse> subscribeToClientData(String clientId) {
    log.info("[subscribeToClientData] clientId :: {}, active sinks :: {}", clientId, clientSinks.size());

    // 기존 세션이 존재하면 종료 처리
    if (clientSinks.containsKey(clientId)) {
        clientSinks.get(clientId).complete();
        clientSinks.remove(clientId);
        log.info("Existing sink for clientId: {} was completed and removed.", clientId);
    }

    return Flux.create(sink -> {
        clientSinks.put(clientId, sink);

        // 구독 취소(Cancel) 시 세션 정리
        sink.onCancel(() -> {
            log.info("onCancel sink for clientId :: {}, active sinks :: {}", clientId, clientSinks.size());
            sink.complete();
            clientSinks.remove(clientId);
        });

        publishClientData(clientId); // 초기 데이터 전송
    });
}

3. 핵심 포인트

1) ConcurrentHashMap을 이용한 세션 추적

각 클라이언트의 구독 세션을 Map<clientId, FluxSink> 형태로 관리합니다.

이는 Thread-safe하므로 단일 서버 환경에서 안정적으로 동작합니다.

private final Map<String, FluxSink<ClientResponse>> clientSinks = new ConcurrentHashMap<>();

2) 중복 세션 방지

새로운 구독이 요청되면, 동일한 clientId로 이미 존재하는 Sink가 있는지 확인하고

존재한다면 즉시 complete() 호출 후 제거합니다.

이 과정을 통해 클라이언트당 항상 단일 세션만 존재하게 됩니다.

if (clientSinks.containsKey(clientId)) {
    clientSinks.get(clientId).complete();
    clientSinks.remove(clientId);
}

3) 연결 종료 시 리소스 정리

클라이언트가 Subscription을 취소하거나 연결이 끊기면 onCancel 이벤트가 발생합니다.

이 시점에 FluxSinkcomplete() 시키고 Map에서 제거하여 메모리 누수를 방지합니다.

sink.onCancel(() -> {
    sink.complete();
    clientSinks.remove(clientId);
});

4. 데이터 푸시 예시

서버에서 특정 시점에 데이터를 전송하고 싶다면 다음과 같이 구현할 수 있습니다.

public void publishClientData(String clientId) {
    FluxSink<ClientResponse> sink = clientSinks.get(clientId);
    if (sink != null) {
        sink.next(new ClientResponse("Updated data for " + clientId));
    }
}

이 방식으로 서버는 특정 클라이언트에게 실시간 데이터를 안전하게 푸시할 수 있습니다.


ConcurrentHashMap

ConcurrentHashMap단일 인스턴스 환경에 한정된 해결책입니다.

서버가 여러 대로 늘어나면 다른 접근 방식이 필요합니다.
(ex: Kafka, Redis...)

profile
조제

0개의 댓글