Redis + Spring Webflux 기반 대기열 큐 세션 자료

김형준·2025년 2월 25일
0

아키텍처 구성 & 코드 실행 흐름

대기 큐 및 작업 큐 분리 이유

얼핏 보면 3 ~ 6번 과정은 하나의 큐만 사용하는 방식이 오버헤드가 더 적으므로 큐를 굳이 분리해야 한다고 느끼지 못할 수 있다.

하지만 큐를 나눠 관리하게 되면 다음과 같은 이점들이 생긴다.

  • 원자성 및 동시성 제어 가능
    • 단일 큐로 요청을 처리할 경우 요구사항에 따라 요청의 상태를 관리하는 값이 추가될 수 있다.
    • 현재는 단일 스케줄러가 요청들을 처리하지만, 성능 개선의 이유로 여러 스케줄러가 동시에 작동하도록 수정된다면 요청 상태 변경을 위한 race condition 발생 및 이를 해결하기 위한 lock 오버헤드 등 여러 문제점을 유발할 수 있다.
  • 장애 발생 시 rollback, 재시도 등이 용이함
    • 하나의 큐에서 관리할 경우 장애가 발생하면 대기중인 모든 요청에 대해 rollback등의 복구 작업을 적용해야 한다.
  • 요청에 대한 필터링 가능
    • 대기 큐에서 꺼내 요청이 유효한지 1차로 검증하고 작업 큐로 넣는 등의 작업도 가능
    • 이를 통해 요청을 비교적 더 효율적으로 처리 가능하다.
  • 각 케이스에 맞는 확장 및 유지보수 가능
    • 대기 큐 및 작업 큐에 대한 별도의 크기 조절이 가능하다.

Spring WebFlux

이벤트들을 비동기적으로 처리하며 이를 Publisher-Subscriber 패턴(Reactive Stream)으로 구현한 Reactor core 기반 기술

한 스레드 내에서 요청에 대한 처리를 동기적으로 대기할 필요 없이 비동기적으로 다음 요청을 처리할 수 있으며 확장성이 좋아 Spring MVC 및 다양한 서버(Netty, Undertow 등)과 통합 가능하다.

여러 비동기 기술들과의 비교

@Async

AOP 방식을 통해 비동기 객체 CompletableFuture를 반환하도록 하는 비동기 기술

어노테이션과 리턴 타입 정도만 신경쓰면 바로 비동기가 적용된다.

하지만 CompletableFuture 결과를 받는 입장에서는 비동기 작업 수행 중 예외가 발생했는지 알기 어렵고, backpressure 또한 기본적으로 지원되지 않는다.

RxJava

Reactive programming 패러다임을 도입하기 위한 라이브러리

Spring Webflux와 달리 RxJava는 스프링 종속적이지 않기에 사용 시 별도의 설정이 요구된다.

또한 RxJava와 달리 Reactor core에서는 비동기 테스트를 쉽게 구현할 수 있는 기능을 제공한다.

또한 아래에서 설명핼 Spring Data Reactive와 같이 스프링 생태계의 여러 지원을 받을 수 있기에 WebFlux를 선택하였다.

Mono/Flux

Reactive 패러다임에서 Publisher를 담당하는 객체들

순차적인 데이터들을 메서드 체이닝 기반 비동기 방식으로 처리한다.

또한 비동기 처리 방식을 순수 Java의 Thread와 같은 low-level 기능을 사용하지 않고도 라이브러리의 기능을 통해 쉽게 사용할 수 있다.

Flux는 N개의 데이터를 반환하고 비동기로 처리할 수 있도록 하며, Mono는 한 개의 데이터를 반환하고 처리할 수 있도록 하는 Publisher다.

Mono/Flux가 발행한 데이터는 subscribe() 와 같은 Subscriber로 전달되어 처리되는 방식이다.

reservationQueue.getWaitingQueueRange(waitingRange)
    .flatMap(this::moveToProcessingQueue)
    // ... 
    .subscribe(
        this::handleResult, 
        error -> System.err.println("Error in processQueue: " + error.getMessage())
    );
  • 위 코드는 먼저 reservationQueue.getWaitingQueueRange()가 Flux를 반환한다.
  • 이후 flatMap() 메서드 체이닝을 통해 Flux으로 이루어진 여러 String 값들에 대한 변환 작업을 비동기적으로 처리하고 하나의 Stream으로 합친다.
  • 메서드 체이닝으로 수행되는 작업들의 결과물 스트림(위에서는 this::moveToProcessingQueue 수행 결과)가 마지막으로 subscribe()에 도달하게 되면 각 요소에 대한 this:handleResult 수행을 끝으로 스트림이 종료된다.
  • 이 때 subscriber로 onComplete가 전달되는 경우 this::handleResult를, onError가 전달되는 경우 에러 메시지 출력을 수행한다.

현재 구현된 코드 상에서는 Controller 단에서 Mono/Flux를 반환하도록 되어 있는데, 이 경우 아래와 같이 요청이 처리된다.

  • 컨트롤러 메서드가 요청에 대한 Mono/Flux 객체를 반환한다.
  • 요청이 들어오면 Spring Webflux 내부 컴포넌트(DispatcherHandler)가 해당 컨트롤러 메서드에 대한 subscribe()을 시작한다.
  • 구독이 시작되면 클라이언트에 전송될 실제 데이터의 생성을 수행한다.

Mono/Flux를 생성하는 경우와 메서드 체이닝에 사용할 수 있는 함수형 연산자들에 대해서는 아래 링크를 참고하자.

https://projectreactor.io/docs/core/release/reference/apdx-operatorChoice.html

Sinks

Mono/Flux에서 내부적으로 수행되던 onSubscribe(), onNext()와 같은 시그널을 기반으로 동작하는 코드들을 명시적으로 처리할 수 있도록 구현된 Subscriber 겸 Publisher

또한 스레드들 사이에서 데이터를 전송할 수 있는 Publisher를 생성하는 기능을 제공한다.

이를 통해 요청을 처리하는 스레드가 Sinks 스케줄러 기반으로 별도로 동작하는 스레드의 작업 결과를 subscriber로써 받아올 수 있게 해 준다.

public Mono<Boolean> enqueueReservation(ReservationCreateReqDto request) {
    double score = System.currentTimeMillis();
    String json = serializer.serialize(request);
    Sinks.One<ReservationCreateResDto> sink = Sinks.one();
    resultSinkMap.put(request.getRequestId(), sink);
    System.out.println("added Request ID: " + request.getRequestId());
		return reservationQueue.addToWaitingQueue(json, score);
}

public Mono<ReservationCreateResDto> waitForProcessingResult(String reservationId) {
    Sinks.One<ReservationCreateResDto> sink = resultSinkMap.get(reservationId);
    if (sink != null) {
		    return sink.asMono();
    }
		return Mono.empty();
}

private void handleResult(ReservationCreateResDto result) {
    System.out.println("요청 처리 결과 반환 중\nresult: " + result);
    Sinks.One<ReservationCreateResDto> sink = resultSinkMap.remove(result.requestId());
    if (sink == null) {
		    throw new IllegalStateException("Sink not found");
    }
		sink.tryEmitValue(result);
}

각 메서드가 수행하는 역할을 정리하자면 아래와 같다.

  • enqueueReservation: sink를 생성하여 결과 대기를 위한 준비를 함
  • handleResult: 별도 스레드에서 작업 결과를 emit하며, 이 과정에서 onNext와 onComplete 시그널을 구독자에게 보내는 역할
  • waitForProcessingResult: sink의 Mono를 구독하여 비동기적으로 결과를 받는 역할

ReactiveRedisTemplate

Reactive Programming 패러다임에서 활용 가능하도록 구현된 RedisTemplate

기존 RedisTemplate와 달리 여러 작업들에 대해 Mono, Flux 반환 타입을 갖는다.

opsForValue(), opsForList()와 같은 자료구조에 대한 작업을 제공하는 메서드들은 마찬가지로 작업 결과를 Mono<> 또는 Flux<> 형식으로 반환하는 ReactiveStreamOperations를 제공한다.

또한 반환 타입에 아래와 같이 여러 메서드 체이닝 기반 추가 작업들을 수행하도록 구현할 수 있다.

// 조회 결과의 각 요소들을 변환
redisTemplate.opsForZSet().range("myZSet", 0, 10)
    .map(item -> transform(item))
    .subscribe(result -> System.out.println(result));

// 조건에 맞는 요소만 추출
redisTemplate.opsForZSet().range("myZSet", 0, 10)
    .filter(item -> itemMeetsCondition(item))
    .subscribe(filteredItem -> System.out.println(filteredItem));

// 스트림의 각 요소 처리 시 같이 수행될 부가 적업 추가
redisTemplate.opsForZSet().range("myZSet", 0, 10)
    .doOnNext(item -> System.out.println("Processing: " + item))
    .subscribe();

// Flux 요소들을 하나의 자료구조로 모음
redisTemplate.opsForZSet().range("myZSet", 0, 10)
    .collectList()
    .subscribe(list -> System.out.println("전체 리스트: " + list));

// 체인 중 에러가 발생한 경우 대체 동작/기본값을 제공해 스트림 중단을 방지
redisTemplate.opsForZSet().range("myZSet", 0, 10)
    .onErrorResume(e -> {
        System.err.println("오류 발생: " + e.getMessage());
        return Flux.empty();
    })
    .subscribe();

이외에도 체인 중 에러가 발생한 케이스에 대해서는 처리를 건너뛰고 다음 요소에 대해 작업을 계속하도록 하는 onErrorContinue(), 처음 n개의 요소만 반환하는 take(), 처음 n개의 요소를 건너뛰는 skip() 등을 사용할 수 있다.

이후 subscribe()를 붙여 리액티브 스트림에서 데이터 처리를 시작할 수 있도록 한다. 또한 구독자를 등록해 데이터 발행, 에러 처리, 완료 신호를 받을 수 있게 한다.

비동기 테스트

reactive programming 패러다임을 적용하였기에, 테스트 코드도 이를 처리할 수 있도록 짜여질 필요가 있다고 생각했다.

따라서 Publisher가 반환하는 시퀀스를 구독해 검증할 수 있는 StepVerifier를 사용하였다.

StepVerifier

스트림에 구독 시 발생될 것으로 예상되는 이벤트가 실제로 발생했는지, 스트림 값이 예상 값을 갖는가를 검증하는 Subscriber 겸 Reactor 테스트 모듈

ReservationCreateReqDto request = new ReservationCreateReqDto(1L, 1L, null);
Mono<ReservationCreateResDto> resultMono = queueService.waitForProcessingResult(request.getRequestId()); 
StepVerifier.create(resultMono.timeout(Duration.ofSeconds(5)))
				.assertNext(result -> {
		        assertEquals(request.getRequestId(), result.requestId());
        })
        .verifyComplete();

위 코드에서는 우선 queueService.waitForProcessingResult()의 수행 결과로 받아와진 result(onNext)에 대해 result의 requestId 값이 request.requestId()와 동일한지를 검증한다.

이후 해당 체이닝들을 정상적으로 통과하고 onComplete 이벤트가 발생하는지를 검증하는 식으로 동작한다.

개선 및 고려사항

  • 현재 Controller 단에서 Mono를 반환하고 있지만, 아직 Netty 서버를 사용하도록 하는 Spring Webflux에 대한 의존관계는 추가되지 않았다.
    • 의존관계를 추가해 볼 예정이다.
  • 현재 버전의 대기열 구현 코드에서는 큐를 나눈 이유가 제대로 적용되지 않고 있다고 판단된다.
    • 에러 추적 및 backpressure, 원자적인 DTO 이동 구현, 동적 스케일링 추가 등 큐를 나누는 이유를 만들 예정이다.
    • 또한 단일 큐를 사용하는 버전도 따로 구현해 비교해볼 예정이다.
    • 수정 방식은 아래에서 언급하는 대로 따를 예정이다.
  • 요청이 몰리는 경우를 해결하기 위해서 무조건 Webflux를 사용할 필요는 없다.
    • Java 17 → 21 migration만 가능하다면 virtual thread를 사용하는 것도 좋다. (링크)
    • 다만 migration을 위해 배포환경, 의존성 등 신경써야 할 점들이 있다는 것이 흠이다.
  • Webflux와 Redis가 혼용되어 있다.
    • Webflux, Redis 모두 pub/sub을 기반으로 동작할 수 있다.
    • 그 말은 둘 중 하나만 사용해도 대기열 처리 구현이 가능하다는 뜻이다.
    • 따라서 어떤 기술을 사용해서 개선할지 고민해 볼 필요가 있다.

참고자료

https://www.baeldung.com/spring-async

https://hoons-dev.tistory.com/120

https://d2.naver.com/helloworld/2771091

https://projectreactor.io/docs/core/release/reference/coreFeatures/flux.html

https://www.baeldung.com/reactive-streams-step-verifier-test-publisher

0개의 댓글

관련 채용 정보