Reactive Program 에 처음 발을 들인 당신은 일단 처음부터 당황스러울 것이다.
Mono? Flux? 이게 뭐지...?
다행히도 이 시리즈의 전 글을 읽었다면 아주아주 쉽게 이해할 수 있다.
안읽었다면 읽고오자
Mono 와 Flux 는 Publisher 다.
단지 방출하는 아이템의 갯수가 다를 뿐, 결국엔 Publisher 다.
앞으로의 설명은 전 글을 읽었다는 가정하에 진행하겠다.
Mono is a specialized Publisher that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).
설명이 어려워 보이지만 전 글에서 봤던 내용을 떠올려 본다면 간단하다.
Reactive Streams 에서 publisher 가 subscriber 에게 아이템을 제공하던 것을 떠올려보자.
Mono 는 Publisher 다. 최대 하나의 아이템을 제공하는.
당연하게도 Reactive Streams 에서 제공한 스펙을 따르기 때문에 위에 적은 내용과 동일한 일을 한다.
0-1 개의 아이템을 방출하는 publisher 인 것이다.
A Flux is a standard Publisher that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.
Flux 는 Mono 와 단 하나의 내용을 제외하곤 동일하다.
Flux 는 0개 이상의 아이템을 제공하는 Publisher 이다.
0-N 개의 아이템을 방출하는 Publisher 인 것이다.
와 우리는 Mono 와 Flux 의 개념을 정복했다!
이것을 기억한다면 Mono 와 Flux. 별거 없다.
하지만 문제는 여기서부터 발생한다.
단순한 데이터는 의미를 가지고 있지 않다. 데이터가 가공이 되었을 때 그 데이터는 비로소 의미를 가지고 사용자에게 기능을 제공할 수 있다.
Mono 와 Flux 만으로는 이러한 가공을 진행할 수 없다.
그렇기 때문에 아이템의 가공을 진행하기 위해 operator 가 사용된다.
리액티브 라이브러리에서, 아이템의 흐름은 operator 들을 통해 조작 가능하다. 단순한 데이터의 방출에서 operator 를 이용한 method chaining 을 통해 비로서 원하는 동작을 우리는 프로그램에 구현할 수 있다.
// operator 를 이용한 데이터 가공 example
Mono.just(1)
.map(v -> v * v)
.map(v -> v + v)
.subscribe(System.out::println);
먼저 Project Reactor 에서 정의한 operator 에 대해 봐보자
In Reactor, operators are the workstations in our assembly analogy. Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher, as we will see shortly.
operator 는 Publisher 에 새로운 행동을 추가하고, 전 단계의 Publisher 를 새 인스턴스로 감싸서 전달한다.
이러한 operator 는 체인에 연결되어있으며 각각의 operator 를 거치면서 단계별로 아이템이 변환된다.
이렇게 변환된 아이템은 최종적으로 subscriber 를 통해 소비된다.
크게 오퍼레이터의 종류를 나누자면 아래로 나눌 수 있다.
무엇을 생성하는가 한다면 바로 Mono 와 Flux 를 생성한다.
Mono.just("Hello world");
와. HelloWorld 를 방출하는 Mono 가 하나 생성되었다!
위와 비슷한 방식으로 우리는 Publisher 를 생성할 수 있다!
Flux.just("Hello", "World");
Flux.fromIterable(List.of("Hello", "World"));
Mono.fromCallable(() -> "Callable result");
그리고 operator 를 통해 데이터를 변환하거나
// 1~5 까지 아이템을 방출하며 그 데이터에 2를 곱하는 코드
Flux.range(1, 5)
.map(v -> v * 2) // i.e. 1, 4, 9, 16, 25
.flatMap(v -> Flux.range(3, 10).filter(i -> i == v))
필터링하여 원하는 특정 데이터만 얻거나
Flux.range(1, 10)
.filter(i -> i % 2 == 0) // i.e. 2, 4, 6, 8, 10
Flux.just(1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
.distinct() // i.e. 1, 2, 3, 4
여러 Publisher 를 결합하여 사용하거나
Flux.merge(
Flux.interval(Duration.ofMillis(500)),
Flux.interval(Duration.ofMillis(700))
)
발생한 오류를 핸들하고
Flux.just(1, 2, 0)
.map(i -> 10 / i)
.onErrorReturn(-1)
Flux.just("A", "B")
.flatMap(this::unstableOperation)
.retry(3)
원하는 만큼 조절하여 사용할 수 있다.
Flux.range(1, 10)
.buffer(3)
Flux.range(1, 10)
.window(3)
이런식으로 우리는 Publisher 와 Operator 를 이용, 다양한 데이터를 얻고 조작하여 subscriber 를 통해 소비할 수 있다.
어떤 조합을 가지냐에 방출된 데이터에 대한 로직 처리를 다양하게 가져갈 수 있고,
그를 통해 원하는 비즈니스적 목표를 획득할 수 있다.
이번 글을 통해 아주 Mono 와 Flux 의 정의를 알아보고, 아주 간단한 예외 코드들을 통해
operator 를 통해 할 수 있는 일들에 대해 간략하게 알아봤다.
다음 글에서는 조금 더 자세하게 사용하는 방법에 대해 알아보고자 한다.
Flux, an Asynchronous Sequence of 0-N Items (Project Reactor)
Mono, an Asynchronous 0-1 Result (Project Reactor)