얼핏 보면 3 ~ 6번 과정은 하나의 큐만 사용하는 방식이 오버헤드가 더 적으므로 큐를 굳이 분리해야 한다고 느끼지 못할 수 있다.
하지만 큐를 나눠 관리하게 되면 다음과 같은 이점들이 생긴다.
이벤트들을 비동기적으로 처리하며 이를 Publisher-Subscriber 패턴(Reactive Stream)으로 구현한 Reactor core 기반 기술
한 스레드 내에서 요청에 대한 처리를 동기적으로 대기할 필요 없이 비동기적으로 다음 요청을 처리할 수 있으며 확장성이 좋아 Spring MVC 및 다양한 서버(Netty, Undertow 등)과 통합 가능하다.
@Async
AOP 방식을 통해 비동기 객체 CompletableFuture를 반환하도록 하는 비동기 기술
어노테이션과 리턴 타입 정도만 신경쓰면 바로 비동기가 적용된다.
하지만 CompletableFuture 결과를 받는 입장에서는 비동기 작업 수행 중 예외가 발생했는지 알기 어렵고, backpressure 또한 기본적으로 지원되지 않는다.
RxJava
Reactive programming 패러다임을 도입하기 위한 라이브러리
Spring Webflux와 달리 RxJava는 스프링 종속적이지 않기에 사용 시 별도의 설정이 요구된다.
또한 RxJava와 달리 Reactor core에서는 비동기 테스트를 쉽게 구현할 수 있는 기능을 제공한다.
또한 아래에서 설명핼 Spring Data Reactive와 같이 스프링 생태계의 여러 지원을 받을 수 있기에 WebFlux를 선택하였다.
Reactive 패러다임에서 Publisher를 담당하는 객체들
순차적인 데이터들을 메서드 체이닝 기반 비동기 방식으로 처리한다.
또한 비동기 처리 방식을 순수 Java의 Thread와 같은 low-level 기능을 사용하지 않고도 라이브러리의 기능을 통해 쉽게 사용할 수 있다.
Flux는 N개의 데이터를 반환하고 비동기로 처리할 수 있도록 하며, Mono는 한 개의 데이터를 반환하고 처리할 수 있도록 하는 Publisher다.
Mono/Flux가 발행한 데이터는 subscribe() 와 같은 Subscriber로 전달되어 처리되는 방식이다.
reservationQueue.getWaitingQueueRange(waitingRange)
.flatMap(this::moveToProcessingQueue)
// ...
.subscribe(
this::handleResult,
error -> System.err.println("Error in processQueue: " + error.getMessage())
);
현재 구현된 코드 상에서는 Controller 단에서 Mono/Flux를 반환하도록 되어 있는데, 이 경우 아래와 같이 요청이 처리된다.
Mono/Flux를 생성하는 경우와 메서드 체이닝에 사용할 수 있는 함수형 연산자들에 대해서는 아래 링크를 참고하자.
https://projectreactor.io/docs/core/release/reference/apdx-operatorChoice.html
Mono/Flux에서 내부적으로 수행되던 onSubscribe(), onNext()와 같은 시그널을 기반으로 동작하는 코드들을 명시적으로 처리할 수 있도록 구현된 Subscriber 겸 Publisher
또한 스레드들 사이에서 데이터를 전송할 수 있는 Publisher를 생성하는 기능을 제공한다.
이를 통해 요청을 처리하는 스레드가 Sinks 스케줄러 기반으로 별도로 동작하는 스레드의 작업 결과를 subscriber로써 받아올 수 있게 해 준다.
public Mono<Boolean> enqueueReservation(ReservationCreateReqDto request) {
double score = System.currentTimeMillis();
String json = serializer.serialize(request);
Sinks.One<ReservationCreateResDto> sink = Sinks.one();
resultSinkMap.put(request.getRequestId(), sink);
System.out.println("added Request ID: " + request.getRequestId());
return reservationQueue.addToWaitingQueue(json, score);
}
public Mono<ReservationCreateResDto> waitForProcessingResult(String reservationId) {
Sinks.One<ReservationCreateResDto> sink = resultSinkMap.get(reservationId);
if (sink != null) {
return sink.asMono();
}
return Mono.empty();
}
private void handleResult(ReservationCreateResDto result) {
System.out.println("요청 처리 결과 반환 중\nresult: " + result);
Sinks.One<ReservationCreateResDto> sink = resultSinkMap.remove(result.requestId());
if (sink == null) {
throw new IllegalStateException("Sink not found");
}
sink.tryEmitValue(result);
}
각 메서드가 수행하는 역할을 정리하자면 아래와 같다.
Reactive Programming 패러다임에서 활용 가능하도록 구현된 RedisTemplate
기존 RedisTemplate와 달리 여러 작업들에 대해 Mono, Flux 반환 타입을 갖는다.
opsForValue(), opsForList()와 같은 자료구조에 대한 작업을 제공하는 메서드들은 마찬가지로 작업 결과를 Mono<> 또는 Flux<> 형식으로 반환하는 ReactiveStreamOperations를 제공한다.
또한 반환 타입에 아래와 같이 여러 메서드 체이닝 기반 추가 작업들을 수행하도록 구현할 수 있다.
// 조회 결과의 각 요소들을 변환
redisTemplate.opsForZSet().range("myZSet", 0, 10)
.map(item -> transform(item))
.subscribe(result -> System.out.println(result));
// 조건에 맞는 요소만 추출
redisTemplate.opsForZSet().range("myZSet", 0, 10)
.filter(item -> itemMeetsCondition(item))
.subscribe(filteredItem -> System.out.println(filteredItem));
// 스트림의 각 요소 처리 시 같이 수행될 부가 적업 추가
redisTemplate.opsForZSet().range("myZSet", 0, 10)
.doOnNext(item -> System.out.println("Processing: " + item))
.subscribe();
// Flux 요소들을 하나의 자료구조로 모음
redisTemplate.opsForZSet().range("myZSet", 0, 10)
.collectList()
.subscribe(list -> System.out.println("전체 리스트: " + list));
// 체인 중 에러가 발생한 경우 대체 동작/기본값을 제공해 스트림 중단을 방지
redisTemplate.opsForZSet().range("myZSet", 0, 10)
.onErrorResume(e -> {
System.err.println("오류 발생: " + e.getMessage());
return Flux.empty();
})
.subscribe();
이외에도 체인 중 에러가 발생한 케이스에 대해서는 처리를 건너뛰고 다음 요소에 대해 작업을 계속하도록 하는 onErrorContinue()
, 처음 n개의 요소만 반환하는 take()
, 처음 n개의 요소를 건너뛰는 skip()
등을 사용할 수 있다.
이후 subscribe()
를 붙여 리액티브 스트림에서 데이터 처리를 시작할 수 있도록 한다. 또한 구독자를 등록해 데이터 발행, 에러 처리, 완료 신호를 받을 수 있게 한다.
reactive programming 패러다임을 적용하였기에, 테스트 코드도 이를 처리할 수 있도록 짜여질 필요가 있다고 생각했다.
따라서 Publisher가 반환하는 시퀀스를 구독해 검증할 수 있는 StepVerifier를 사용하였다.
스트림에 구독 시 발생될 것으로 예상되는 이벤트가 실제로 발생했는지, 스트림 값이 예상 값을 갖는가를 검증하는 Subscriber 겸 Reactor 테스트 모듈
ReservationCreateReqDto request = new ReservationCreateReqDto(1L, 1L, null);
Mono<ReservationCreateResDto> resultMono = queueService.waitForProcessingResult(request.getRequestId());
StepVerifier.create(resultMono.timeout(Duration.ofSeconds(5)))
.assertNext(result -> {
assertEquals(request.getRequestId(), result.requestId());
})
.verifyComplete();
위 코드에서는 우선 queueService.waitForProcessingResult()의 수행 결과로 받아와진 result(onNext)에 대해 result의 requestId 값이 request.requestId()와 동일한지를 검증한다.
이후 해당 체이닝들을 정상적으로 통과하고 onComplete 이벤트가 발생하는지를 검증하는 식으로 동작한다.
https://www.baeldung.com/spring-async
https://hoons-dev.tistory.com/120
https://d2.naver.com/helloworld/2771091
https://projectreactor.io/docs/core/release/reference/coreFeatures/flux.html
https://www.baeldung.com/reactive-streams-step-verifier-test-publisher