Project Reactor는 JVM에서 리액티르 애플리케이션을 개발하기 위한 핵심 라이브러리이다. RxJava와 유사하지만 Spring Framework 5이상 (spring WebFlux 등)에서 리액티브 스택의 기반으로 사용되면서 널리 알라졌다.
Project Reactor의 핵심은 바로 데이터를 비동기적으로 처리하는 두 가지 리액티브 타입인 Mono과 Flux이다.
Mono와 Flux를 이해하기 전 먼저 리액티브 스트림즈 사양을 이해해야 한다.
Reactive Streams는 비동기 스트림 처리 시 백프레셔(Backpressure)를 지원하기 위한 표준 사양이다. 이 사양은 네 가지 인터페이스를 정의한다.
Publisher<T>: 0개 이상의 T 타입 항목을 발행(emit)하는 프로바이더이다.Subscriber<T>: Publisher가 발행하는 T 타입 항목을 소비(consume)하는 리시버이다.Subscription: Publisher와 Subscriber간의 단방향 흐름 제어를 위한 객체이다. Subscriber가 request()를 통해 데이터를 요청할 때 사용된다.Processor<I, O>: Subecriber이면서 Publisher인 객체로 입력 스트림을 처리하여 출력 스트림으로 변환한다.Mono와 Flux는 모둔 Publisher 인터페이스를 구현한다. 즉 이들은 데이터를 비동기적으로 발행할 수 있는 발행자 역할을 한다.
Mono는 0개 또는 1개의 항목을 발행하는 리액티브 시퀸스(Reactive Sequence)를 나타낸다. 마치 비동기적인 단일 결과 또는 값이 있을 수도, 없을 수도 있는 미래의 값과 같다고 생각할 수 있다.
Mono는 성공적으로 하나의 값을 발행하거나 아무 값도 발행하지 않고 완료되거나 오류를 발생시킬 수 있다.Mono는 선언적으로 정의되지만 실제 데이터 발행 및 처리는 subscribe()메소드가 호출될 때 시작된다. 이는 리소스가 불필요하게 소비되는 것을 방지한다.Mono<Void>)public class MonoExample {
public static void main(String[] args) {
// 1. 단일 값 발행 Mono
Mono<String> nameMono = Mono.just("Reactor"); // "Reactor"를 발행하고 완료
// 2. 값 없이 완료되는 Mono (예: 삭제 작업)
Mono<Void> emptyMono = Mono.empty(); // 아무 값도 발행하지 않고 완료
// 3. 지연 실행 및 구독
nameMono.subscribe(
value -> System.out.println("결과값: " + value), // 성공 시
error -> System.err.println("에러: " + error), // 오류 시
() -> System.out.println("성공") // 완료 시
);
emptyMono.subscribe(
value -> System.out.println("결과값: " + value), // 이 부분은 호출되지 않음
error -> System.err.println("에러: " + error),
() -> System.out.println("값이 없는 Mono 성공")
);
// 4. 오류 발행 Mono
Mono<String> errorMono = Mono.error(new RuntimeException("오류 발생"));
errorMono.subscribe(
value -> System.out.println("결과값: " + value),
error -> System.err.println("Mono에서 오류 발생: " + error.getMessage()),
() -> System.out.println("오류 발생 Mono 성공")
);
}
}
Flux는 0개에서 N개(무한대 포함)의 항목을 발행하는 리액티브 시퀸스를 나타낸다. 마치 "비동기적인 값들의 스트림" 또는 "시간대에 따라 변화하는 데이터의 흐름"과 같다고 생각할 수 있다.
Flux 0개, 1개 또는 여러 개의 값을 발행하고 완료되거나 무한히 값을 발행할 수도 있다. (ex: 실시간 이벤트 스트림)Mono와 마찬가지로 Flux도 subscribe()가 호출될 때까지 실제 데이터 발행을 시작하지 않는다.public class FluxExample {
public static void main(String[] args) throws InterruptedException {
// 1. 고정된 수의 항목 발행 Flux
Flux<String> wordsFlux = Flux.just("A", "B", "C");
// 2. 컬렉션에서 항목 발행 Flux
Flux<Integer> numbersFlux = Flux.range(1, 5); // 1부터 5까지 발행
// 3. 지연 실행 및 구독
wordsFlux.subscribe(
word -> System.out.println("발행한 단어: " + word),
error -> System.err.println("에러: " + error),
() -> System.out.println("단어 Flux 성공")
);
numbersFlux.subscribe(
number -> System.out.println("발행한 숫자: " + number),
error -> System.err.println("에러: " + error),
() -> System.out.println("숫자 Flux 성공")
);
// 4. 무한 스트림 (예: 타이머)
System.out.println("3초간 무한한 Flux 발생");
Flux<Long> infiniteFlux = Flux.interval(Duration.ofSeconds(1)); // 1초마다 0부터 증가하는 값 발행
infiniteFlux.subscribe(
tick -> System.out.println("Tick: " + tick)
);
// 실제 애플리케이션에서는 메인 스레드가 즉시 종료되지 않도록 대기
Thread.sleep(3000); // 3초 동안 대기하여 타이머 작동 확인
System.out.println("무한 Flux 종료");
// 5. 오류 발행 Flux
Flux<String> errorFlux = Flux.just("A", "B")
.concatWith(Flux.error(new RuntimeException("중간에 예외 삽입")))
.concatWith(Flux.just("C")); // C는 발행되지 않음
errorFlux.subscribe(
item -> System.out.println("발행한 단어: " + item),
error -> System.err.println("에러: " + error.getMessage()),
() -> System.out.println("Error Flux 성공")
);
}
}
Publisher 구현: 둘 다 리액티브 스트림즈의 Publisher인터페이스를 구현한다. 즉 데이터 스트림을 발행하는 역할을 한다.subscribe()가 호출될 때까지 실제 작업이 시작되지 않는다.map, filter, flatMap, zip, combineLatest 등 다양한 연산자를 제공하여 스트림을 변환, 조합, 필터링 할 수 있다.Subscriber가 필요한 만큼 데이터만 요청하도록 Subscription을 통해 흐름을 제어한다.onErrorReturn, onErrorResume, retry 등 다양한 오류 처리 메커니즘을 제공한다.| 특성 | Mono | Flux |
|---|---|---|
| 발행 항목 수 | 0 or 1 | 0 ~ INF |
| 용도 | 단일 결과, 비동기 완료 작업 | 다중 결과, 스트리밍 데이터, 이벤트 |
| 성격 | 미래의 단일 값 | 값들의 스트림 |
| 비유 | Optional<T>, Future<T>와 유사 | Iterable<T>, <Stream<T>와 유사 |
Project Reactor는 Mono와 Flux를 서로 변환할 수 있는 편리한 연산자들을 제공한다.
Mono.flux()로 Mono를 Flux로 변환한다. Mono가 값을 발행하면 Flux가 그 값을 발행하고 완료된다. Mono가 비어있으면 Flux도 비어있는다.
Flux.single(): Flux가 정확히 하나의 항목을 발행할 것으로 예상될 때 Mono로 변환된다. 여러 개를 발행하면 IllegalArgumentException, 없으면 NoSuchExcpetion을 발생시킨다.Flux.next(): Flux에서 첫 번째 항목을 발행하는 Mono를 생성한다. Flux가 비어있으면 Mono도 비어있다.Flux.collectList(), Flux.collectMap(), Flux.collectSortedList() 등: Flux의 모든 항목을 수집하여 List, Map 등의 형태로 Mono로 발행한다.public class ConvertExample {
public static void main(String[] args) {
// Mono -> Flux
Mono<String> monoString = Mono.just("A");
Flux<String> fluxFromMono = monoString.flux();
fluxFromMono.subscribe(System.out::println); // Prints: A
// Flux -> Mono (single)
Flux<Integer> singleElementFlux = Flux.just(1);
Mono<Integer> monoFromFlux = singleElementFlux.single();
monoFromFlux.subscribe(System.out::println); // Prints: 1
// Flux -> Mono (collectList)
Flux<String> multiElementFlux = Flux.just("A", "B", "C");
Mono<List<String>> listMono = multiElementFlux.collectList();
listMono.subscribe(list -> System.out.println("List: " + list)); // Prints: List: [A, B, C]
}
}
Mono와 Flux는 Project Reactor의 핵심 구성 요소이며 비동기적이고 논블로킹 방식으로 데이터 스트림을 처리하기 위한 강력한 도구이다. Mono는 단일 결과에 Flux는 다중 결과 스트림에 적합하며 이 둘을 적절히 사용하여 복잡한 비동기 로직을 간결하고 선언적으로 구성할 수 있다.
Spring WebFlux와 함께 사용될 때 더 효율적이며 현대적인 고성능 리액티브 애플리케이션을 구축하는 데 필수적인 개념이다.