Mono 와 Flux 그리고 Operator

Lofri·2024년 6월 26일

WebFlux

목록 보기
3/3

Reactive Program 에 처음 발을 들인 당신은 일단 처음부터 당황스러울 것이다.
Mono? Flux? 이게 뭐지...?

다행히도 이 시리즈의 전 글을 읽었다면 아주아주 쉽게 이해할 수 있다.
안읽었다면 읽고오자
Mono 와 Flux 는 Publisher 다.
단지 방출하는 아이템의 갯수가 다를 뿐, 결국엔 Publisher 다.

앞으로의 설명은 전 글을 읽었다는 가정하에 진행하겠다.


Mono

Mono is a specialized Publisher that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

설명이 어려워 보이지만 전 글에서 봤던 내용을 떠올려 본다면 간단하다.
Reactive Streams 에서 publisher 가 subscriber 에게 아이템을 제공하던 것을 떠올려보자.

  • request(int) 요청에 해당하는 데이터가 제공 가능하다면 subscriber 의 onNext 를 통해 데이터를 제공한다.
  • 데이터 방출이 완료되었을 경우 onComplete 호출을 통해 완료 상태를 subscriber 에게 전달한다.
  • 에러 발생 시 onError 를 통해 subscriber 에게 에러 시그널을 전파한다.

Mono 는 Publisher 다. 최대 하나의 아이템을 제공하는.
당연하게도 Reactive Streams 에서 제공한 스펙을 따르기 때문에 위에 적은 내용과 동일한 일을 한다.

0-1 개의 아이템을 방출하는 publisher 인 것이다.


Flux

A Flux is a standard Publisher that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

Flux 는 Mono 와 단 하나의 내용을 제외하곤 동일하다.
Flux 는 0개 이상의 아이템을 제공하는 Publisher 이다.

0-N 개의 아이템을 방출하는 Publisher 인 것이다.

와 우리는 Mono 와 Flux 의 개념을 정복했다!
이것을 기억한다면 Mono 와 Flux. 별거 없다.


Operators

하지만 문제는 여기서부터 발생한다.

단순한 데이터는 의미를 가지고 있지 않다. 데이터가 가공이 되었을 때 그 데이터는 비로소 의미를 가지고 사용자에게 기능을 제공할 수 있다.

Mono 와 Flux 만으로는 이러한 가공을 진행할 수 없다.
그렇기 때문에 아이템의 가공을 진행하기 위해 operator 가 사용된다.

리액티브 라이브러리에서, 아이템의 흐름은 operator 들을 통해 조작 가능하다. 단순한 데이터의 방출에서 operator 를 이용한 method chaining 을 통해 비로서 원하는 동작을 우리는 프로그램에 구현할 수 있다.

// operator 를 이용한 데이터 가공 example
Mono.just(1)
	.map(v -> v * v)
    .map(v -> v + v)
    .subscribe(System.out::println);

먼저 Project Reactor 에서 정의한 operator 에 대해 봐보자

In Reactor, operators are the workstations in our assembly analogy. Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher, as we will see shortly.

operator 는 Publisher 에 새로운 행동을 추가하고, 전 단계의 Publisher 를 새 인스턴스로 감싸서 전달한다.

이러한 operator 는 체인에 연결되어있으며 각각의 operator 를 거치면서 단계별로 아이템이 변환된다.

이렇게 변환된 아이템은 최종적으로 subscriber 를 통해 소비된다.

크게 오퍼레이터의 종류를 나누자면 아래로 나눌 수 있다.

  1. 생성 (Creation)
  2. 변환 (Transformation)
  3. 필터링 (Filtering)
  4. 결합 (Combining)
  5. 오류 처리 (Error handling)
  6. 버퍼링 및 윈도잉 (Buffering and Windowing)

생성

무엇을 생성하는가 한다면 바로 Mono 와 Flux 를 생성한다.

Mono.just("Hello world");

와. HelloWorld 를 방출하는 Mono 가 하나 생성되었다!

위와 비슷한 방식으로 우리는 Publisher 를 생성할 수 있다!

Flux.just("Hello", "World");
Flux.fromIterable(List.of("Hello", "World"));
Mono.fromCallable(() -> "Callable result");

변환

그리고 operator 를 통해 데이터를 변환하거나

// 1~5 까지 아이템을 방출하며 그 데이터에 2를 곱하는 코드
Flux.range(1, 5)
	.map(v -> v * 2) // i.e. 1, 4, 9, 16, 25
    .flatMap(v -> Flux.range(3, 10).filter(i -> i == v))

필터링

필터링하여 원하는 특정 데이터만 얻거나

Flux.range(1, 10)
	.filter(i -> i % 2 == 0) // i.e. 2, 4, 6, 8, 10
Flux.just(1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
	.distinct()	// i.e. 1, 2, 3, 4

결합

여러 Publisher 를 결합하여 사용하거나

Flux.merge(
		Flux.interval(Duration.ofMillis(500)),
    	Flux.interval(Duration.ofMillis(700))
	)

오류 처리

발생한 오류를 핸들하고

Flux.just(1, 2, 0)
    .map(i -> 10 / i)
    .onErrorReturn(-1)
Flux.just("A", "B")
    .flatMap(this::unstableOperation)
    .retry(3)

버퍼링 및 윈도잉

원하는 만큼 조절하여 사용할 수 있다.

Flux.range(1, 10)
    .buffer(3)
Flux.range(1, 10)
    .window(3)

이런식으로 우리는 Publisher 와 Operator 를 이용, 다양한 데이터를 얻고 조작하여 subscriber 를 통해 소비할 수 있다.
어떤 조합을 가지냐에 방출된 데이터에 대한 로직 처리를 다양하게 가져갈 수 있고,
그를 통해 원하는 비즈니스적 목표를 획득할 수 있다.


마무리

이번 글을 통해 아주 Mono 와 Flux 의 정의를 알아보고, 아주 간단한 예외 코드들을 통해
operator 를 통해 할 수 있는 일들에 대해 간략하게 알아봤다.

다음 글에서는 조금 더 자세하게 사용하는 방법에 대해 알아보고자 한다.


Reference

Flux, an Asynchronous Sequence of 0-N Items (Project Reactor)
Mono, an Asynchronous 0-1 Result (Project Reactor)

profile
Java BE

0개의 댓글