Project Reactor의 Mono와 Flux

김기현·2025년 7월 31일

Spring WebFlux

목록 보기
6/28

Project Reactor는 JVM에서 리액티르 애플리케이션을 개발하기 위한 핵심 라이브러리이다. RxJava와 유사하지만 Spring Framework 5이상 (spring WebFlux 등)에서 리액티브 스택의 기반으로 사용되면서 널리 알라졌다.
Project Reactor의 핵심은 바로 데이터를 비동기적으로 처리하는 두 가지 리액티브 타입인 MonoFlux이다.


리액티브 스트림즈(Reactive Streams)와 Mono/Flux

MonoFlux를 이해하기 전 먼저 리액티브 스트림즈 사양을 이해해야 한다.
Reactive Streams는 비동기 스트림 처리 시 백프레셔(Backpressure)를 지원하기 위한 표준 사양이다. 이 사양은 네 가지 인터페이스를 정의한다.

  • Publisher<T>: 0개 이상의 T 타입 항목을 발행(emit)하는 프로바이더이다.
  • Subscriber<T>: Publisher가 발행하는 T 타입 항목을 소비(consume)하는 리시버이다.
  • Subscription: PublisherSubscriber간의 단방향 흐름 제어를 위한 객체이다. Subscriberrequest()를 통해 데이터를 요청할 때 사용된다.
  • Processor<I, O>: Subecriber이면서 Publisher인 객체로 입력 스트림을 처리하여 출력 스트림으로 변환한다.

MonoFlux는 모둔 Publisher 인터페이스를 구현한다. 즉 이들은 데이터를 비동기적으로 발행할 수 있는 발행자 역할을 한다.


Mono: 0개 또는 1개의 항목을 발행하는 Publisher

Mono0개 또는 1개의 항목을 발행하는 리액티브 시퀸스(Reactive Sequence)를 나타낸다. 마치 비동기적인 단일 결과 또는 값이 있을 수도, 없을 수도 있는 미래의 값과 같다고 생각할 수 있다.

주요 특징

  • 단일 값 또는 없음: Mono는 성공적으로 하나의 값을 발행하거나 아무 값도 발행하지 않고 완료되거나 오류를 발생시킬 수 있다.
  • 비동기 처리: 값이 즉시 준비되지 않고 미래의 어떤 시점에 비동기적으로 값이 생성되거나 완료될 수 있음을 나타낸다.
  • 지연 실행(Lazy Execution): Mono는 선언적으로 정의되지만 실제 데이터 발행 및 처리는 subscribe()메소드가 호출될 때 시작된다. 이는 리소스가 불필요하게 소비되는 것을 방지한다.

언제 사용할까?

  • 단일 결과를 반환하는 API 호출: 예를 들어 사용자 ID로 하나의 사용자 정보를 조회하는 데이터베이스 쿼리 결과.
  • 비동기적으로 완료되는 작업: 파일 쓰기, 단일 HTTP 응답 수신 등.
  • 값이 존재하지 않을 수 있는 경우: 특정 조건에 따라 값이 없을 수도 있는 결과.
  • void 타입과 유사하지만 비동기적: 특정 작업의 완료만 알리고 싶을 때 사용된다. (Mono<Void>)

예시

public class MonoExample {
    public static void main(String[] args) {
        // 1. 단일 값 발행 Mono
        Mono<String> nameMono = Mono.just("Reactor"); // "Reactor"를 발행하고 완료

        // 2. 값 없이 완료되는 Mono (예: 삭제 작업)
        Mono<Void> emptyMono = Mono.empty(); // 아무 값도 발행하지 않고 완료

        // 3. 지연 실행 및 구독
        nameMono.subscribe(
            value -> System.out.println("결과값: " + value), // 성공 시
            error -> System.err.println("에러: " + error),   // 오류 시
            () -> System.out.println("성공")             // 완료 시
        );

        emptyMono.subscribe(
            value -> System.out.println("결과값: " + value), // 이 부분은 호출되지 않음
            error -> System.err.println("에러: " + error),
            () -> System.out.println("값이 없는 Mono 성공")
        );

        // 4. 오류 발행 Mono
        Mono<String> errorMono = Mono.error(new RuntimeException("오류 발생"));
        errorMono.subscribe(
            value -> System.out.println("결과값: " + value),
            error -> System.err.println("Mono에서 오류 발생: " + error.getMessage()),
            () -> System.out.println("오류 발생 Mono 성공")
        );
    }
}

Flux: 0개에서 N개의 항목을 발행하는 Publisher

Flux0개에서 N개(무한대 포함)의 항목을 발행하는 리액티브 시퀸스를 나타낸다. 마치 "비동기적인 값들의 스트림" 또는 "시간대에 따라 변화하는 데이터의 흐름"과 같다고 생각할 수 있다.

주요 특징

  • 다중 값 또는 없음: Flux 0개, 1개 또는 여러 개의 값을 발행하고 완료되거나 무한히 값을 발행할 수도 있다. (ex: 실시간 이벤트 스트림)
  • 비동기 처리: 값들이 시간이 지남에 따라 비동기적으로 생성되고 발행될 수 있음을 나타낸다.
  • 지연 실행(Lazy Execution): Mono와 마찬가지로 Fluxsubscribe()가 호출될 때까지 실제 데이터 발행을 시작하지 않는다.

언제 사용할까?

  • 다중 결과를 반환하는 API 호출: 예를 들어 모든 사용자 목록을 조회하는 데이터베이스 쿼리 결과.
  • 스트리밍 데이터: 실시간 이벤트(채팅 메시지, 주식 시세), 파일 라인별 읽기, 센서 데이터 등.
  • 무한한 시퀸스: 타이머 이벤트 등 특정 간격으로 계속 발생하는 이벤트.

예시

public class FluxExample {
    public static void main(String[] args) throws InterruptedException {
        // 1. 고정된 수의 항목 발행 Flux
        Flux<String> wordsFlux = Flux.just("A", "B", "C");

        // 2. 컬렉션에서 항목 발행 Flux
        Flux<Integer> numbersFlux = Flux.range(1, 5); // 1부터 5까지 발행

        // 3. 지연 실행 및 구독
        wordsFlux.subscribe(
                word -> System.out.println("발행한 단어: " + word),
                error -> System.err.println("에러: " + error),
                () -> System.out.println("단어 Flux 성공")
        );

        numbersFlux.subscribe(
                number -> System.out.println("발행한 숫자: " + number),
                error -> System.err.println("에러: " + error),
                () -> System.out.println("숫자 Flux 성공")
        );

        // 4. 무한 스트림 (예: 타이머)
        System.out.println("3초간 무한한 Flux 발생");
        Flux<Long> infiniteFlux = Flux.interval(Duration.ofSeconds(1)); // 1초마다 0부터 증가하는 값 발행

        infiniteFlux.subscribe(
                tick -> System.out.println("Tick: " + tick)
        );

        // 실제 애플리케이션에서는 메인 스레드가 즉시 종료되지 않도록 대기
        Thread.sleep(3000); // 3초 동안 대기하여 타이머 작동 확인
        System.out.println("무한 Flux 종료");

        // 5. 오류 발행 Flux
        Flux<String> errorFlux = Flux.just("A", "B")
                .concatWith(Flux.error(new RuntimeException("중간에 예외 삽입")))
                .concatWith(Flux.just("C")); // C는 발행되지 않음

        errorFlux.subscribe(
                item -> System.out.println("발행한 단어: " + item),
                error -> System.err.println("에러: " + error.getMessage()),
                () -> System.out.println("Error Flux 성공")
        );
    }
}

Mono와 Flux의 공통점 및 차이점

공통점

  • Publisher 구현: 둘 다 리액티브 스트림즈의 Publisher인터페이스를 구현한다. 즉 데이터 스트림을 발행하는 역할을 한다.
  • 비동기적: 데이터를 비동기적으로 처리한다.
  • 지연 실행: subscribe()가 호출될 때까지 실제 작업이 시작되지 않는다.
  • 풍부한 연산자: map, filter, flatMap, zip, combineLatest 등 다양한 연산자를 제공하여 스트림을 변환, 조합, 필터링 할 수 있다.
  • 백프레셔 지원: Subscriber가 필요한 만큼 데이터만 요청하도록 Subscription을 통해 흐름을 제어한다.
  • 오류 처리: onErrorReturn, onErrorResume, retry 등 다양한 오류 처리 메커니즘을 제공한다.

차이점

특성MonoFlux
발행 항목 수0 or 10 ~ INF
용도단일 결과, 비동기 완료 작업다중 결과, 스트리밍 데이터, 이벤트
성격미래의 단일 값값들의 스트림
비유Optional<T>, Future<T>와 유사Iterable<T>, <Stream<T>와 유사

Mono와 Flux 간의 변환

Project Reactor는 MonoFlux를 서로 변환할 수 있는 편리한 연산자들을 제공한다.

Mono to Flux

Mono.flux()MonoFlux로 변환한다. Mono가 값을 발행하면 Flux가 그 값을 발행하고 완료된다. Mono가 비어있으면 Flux도 비어있는다.

Flux to Mono

  • Flux.single(): Flux가 정확히 하나의 항목을 발행할 것으로 예상될 때 Mono로 변환된다. 여러 개를 발행하면 IllegalArgumentException, 없으면 NoSuchExcpetion을 발생시킨다.
  • Flux.next(): Flux에서 첫 번째 항목을 발행하는 Mono를 생성한다. Flux가 비어있으면 Mono도 비어있다.
  • Flux.collectList(), Flux.collectMap(), Flux.collectSortedList() 등: Flux의 모든 항목을 수집하여 List, Map 등의 형태로 Mono로 발행한다.

예제

public class ConvertExample {
    public static void main(String[] args) {
        // Mono -> Flux
        Mono<String> monoString = Mono.just("A");
        Flux<String> fluxFromMono = monoString.flux();
        fluxFromMono.subscribe(System.out::println); // Prints: A

        // Flux -> Mono (single)
        Flux<Integer> singleElementFlux = Flux.just(1);
        Mono<Integer> monoFromFlux = singleElementFlux.single();
        monoFromFlux.subscribe(System.out::println); // Prints: 1

        // Flux -> Mono (collectList)
        Flux<String> multiElementFlux = Flux.just("A", "B", "C");
        Mono<List<String>> listMono = multiElementFlux.collectList();
        listMono.subscribe(list -> System.out.println("List: " + list)); // Prints: List: [A, B, C]
    }
}

결론

MonoFlux는 Project Reactor의 핵심 구성 요소이며 비동기적이고 논블로킹 방식으로 데이터 스트림을 처리하기 위한 강력한 도구이다. Mono는 단일 결과에 Flux는 다중 결과 스트림에 적합하며 이 둘을 적절히 사용하여 복잡한 비동기 로직을 간결하고 선언적으로 구성할 수 있다.
Spring WebFlux와 함께 사용될 때 더 효율적이며 현대적인 고성능 리액티브 애플리케이션을 구축하는 데 필수적인 개념이다.

profile
백엔드 개발자를 목표로 공부하는 대학생

0개의 댓글