Webflux 의 기반으로 사용되는 project reactor 는 Reactive Streams Spec 을 준수한다.
Project reactor 를 잘 사용하기 위해선 그 근본이 되는 Reactive streams 에 대해 이해하는 것이 중요하다.
나는 reactive streams 의 명세를 보며 가장 먼저 observer 패턴이 생각났고 back-pressure 라는 용어를 보며 정확히 어떤 것을 의미하는지 이해하기 어려웠다.
그렇기에 비슷한 동작을 하는 observer pattern 과 backpressure 를 이해하게 도와줄 수 있는 iterator pattern 을 기본으로 reactive streams 의 기본을 확인해보자 한다.
이 글은 아래 내용을 다룬다.

Iterator<Integer> iterator = someIntegerList.iterator();
while (iterator.hasNext()) {
iterator.next();
}
보통 우리가 여러개의 아이템이 제공되고 이를 핸들링하기 위해 사용하는 대표적인 방법 중 하나는 이터레이터 패턴을 사용하는 것이다.
이터레이터 패턴은 다음과 같은 특징을 가지고 있다.
이터레이터 패턴을 통해, 사용자는 주어진 아이템을 자기가 원하는 상황에 데이터를 받아올 수 있다. 만약 내가 데이터를 200개만 가져오고 싶을 때, 단순히 조건문안에 그 조건을 기술하면 된다.
데이터가 있다는 가정 하에 언제 어떻게 데이터를 가져올 지 스스로 결정 가능한 것이다.
즉 내가 원할 때에 데이터를 Pulling 할 수 있다는 특징을 가지고 있다.

someSubject.addObserver(someObserver);
someSubject.notify();
someSubject.update();
이터레이터 패턴은 사용자가 직접 아이템을 얻고 조작하는데 사용된다.
그렇다면 내가 제어할 수 없는 시점에 아이템 또는 상태의 변화를 확인하고 처리를 진행하고 싶다면 어떻게 할 수 있을까?
Observer Pattern 이 대표적인 예이다.
이터레이터 패턴과 달리 데이터를 직접 가져오지 않고 subject 가 상태의 변화를 비동기적으로 알려주는 것에 대한 처리가 가능하다.
Observer pattern 은 아래 특징을 가지고 있다.
Observer Pattern 을 통해, subject 는 원하는 시점에 subscriber 에게 변화를 알릴 수 있다. 즉 데이터를 주고 싶은 시점에 데이터를 Pushing 할 수 있다는 것이다.
Iterator, Observer 패턴 모두 데이터를 제공하기 위해 사용된다는 관점에서 본다면 매우 비슷하다. 단순히 그 시점이 다를 뿐이지 아이템을 제공하기 위해 사용된다는 점은 동일하다.
Reactive Streams 은 이 두 가지 패턴을 기반으로 바라본다면 이해하기 쉽다.
리액티브 스트림은 크게 네가지 구성요소로 이뤄져있다.
이 글에선 Publisher, Subscriber 의 관계에 대해 다루려 한다.
아래 플로우 차트를 통해 pub, sub 의 동작에 대해 자세히 알아보자.

해당 내용을 풀어서 써본다면 다음과 같다.
그림과 설명만으로는 잘 이해가 되지 않을 수 있으니
아래와 같은 동작을 하는 예시 코드를 통해 좀 더 자세하게 알아보자.
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
public class ExampleMain {
public static void main(String[] args) {
Publisher<Integer> pub = new ExamplePublisher();
Subscriber<Integer> sub = new ExampleSubscriber();
pub.subscribe(sub);
}
}
/**
[Result] :
[ExamplePublisher] Create Subscription
[ExampleSubscriber] Retrieve subscription
[Subscription] 3 items requested
[ExampleSubscriber] Retrieve emitted value: 0
[ExampleSubscriber] Retrieve emitted value: 1
[ExampleSubscriber] Retrieve emitted value: 2
[Subscription] 1 items requested
[ExampleSubscriber] Retrieve emitted value: 3
[Subscription] 1 items requested
[ExampleSubscriber] Retrieve emitted value: 4
[Subscription] 1 items requested
[ExampleSubscriber] Retrieve emitted value: 5
[Subscription] 1 items requested
[Subscription] No items, call Subscriber.onComplete()
[ExampleSubscriber] onComplete called
[ExampleSubscriber] buf: [0, 1, 2, 3, 4, 5]
**/
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExamplePublisher implements Publisher<Integer> {
Iterable<Integer> items = Arrays.asList(0,1,2,3,4,5);
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Iterator<Integer> iterator = items.iterator();
subscriber.onSubscribe(new Subscription() { // (1)
Future<?> f;
@Override
public void request(long n) { // (2)
this.f = executorService.submit(() -> {
long left = n;
try {
while (left > 0) {
if (!iterator.hasNext()) {
subscriber.onComplete(); // (3)
executorService.shutdown();
break;
}
subscriber.onNext(iterator.next()); // (3)
left--;
}
} catch (Exception e) {
subscriber.onError(e); // (4)
}
});
}
@Override
public void cancel() {
f.cancel(true);
}
});
}
}
request() 요청이 올 경우 그 수에 해당하는 데이터를 제공한다.onNext(), onComplete()를 통해 Subscriber 에게 전달한다.onError() 를 통해 Subscriber 에게 위임한다. Delegate failures as messages어디서 많이 본 것 같지 않은가.
3, 4 번을 보면 pub 가 sub 에게 상태 변화를 원하는 시점에 알리는 것을 확인할 수 있다. Observer pattern 이 떠오른다.
위 코드를 통해 우리는 다음과 같은 사실을 알 수 있다.
pushing 한다.import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.List;
public class ExampleSubscriber implements Subscriber<Integer> {
private static final String PREFIX = "[ExampleSubscriber] ";
private static final int AVAIL_SIZE = 3;
private Subscription subscription;
private List<Integer> buf = new ArrayList<>();
@Override
public void onSubscribe(Subscription subscription) { // (1)
System.out.println(PREFIX + "Retrieve subscription");
this.subscription = subscription;
this.subscription.request(AVAIL_SIZE); // (2)
}
@Override
public void onNext(Integer integer) {
buf.add(integer);
System.out.println(PREFIX + "Retrieve emitted value: " + integer);
if (buf.size() >= AVAIL_SIZE) {
this.subscription.request(1); // (2)
}
}
@Override
public void onError(Throwable t) {
System.out.println(t);
}
@Override
public void onComplete() {
System.out.println(PREFIX + "onComplete called");
System.out.println(PREFIX + "buf: " + buf);
}
}
onSubscribe 를 통해 pub 로 부터 Subscription 객체를 받고 이를 통해 pub 에게 요청을 진행한다.request 를 통해 pub 에게 데이터를 필요한 상황에, 필요한 양만큼 요청할 수 있다.onNext, onError, onComplete 를 통해 pub 의 상태 변화를 알림 받을 수 있다. 상황에 따른 행위를 진행할 수 있다.이것 또한 어디서 본 것 같지 않은가?
2번을 보면 iterator 패턴의 next 가 떠오른다.
sub 는 iterator 패턴과 마찬가지로 자신이 원하는 시점에 필요한 양만큼 pub 로 부터 데이터를 수취할 수 있다.
위 코드를 통해 우리는 다음과 같은 특징을 확인할 수 있다.
back-pressure앞의 내용을 통해 Reactive Streams 의 기본 구현과 특징을 확인했다.
그렇다고 이 내용들만을 가지고 실제로 Reactive 한 프로그램을 짜는 것은 어려운 일이다. Reactive Streams 라이브러리 코드를 확인해 보면 알겠지만, 실제 구현된 기능은 많지 않다. API 와 스펙에 대한 약속과 정의만을 구현해놨기 때문이다.
실제 코드 작업은 Flow, RxJava, project-reactor 등 reactive streams 의 spec 을 구현한 라이브러리를 이용해 도움 받을 수 있다.
project-reactor 로 예를 보자면
import reactor.core.publisher.Flux;
import java.util.List;
public class TMain {
public static void main(String[] args) {
Flux.fromIterable(List.of(0,1,2,3,4,5))
.doOnSubscribe(v -> System.out.println("Retrieve subscription"))
.doOnNext(v -> System.out.println("Retrieve emitted value: " + v))
.doOnRequest(v -> System.out.println(v + " items requested"))
.doOnComplete(() -> System.out.println("onComplete called"))
.collectList()
.subscribe(System.out::println);
}
}
코드를 통해 위에 작성한 코드와 동일한 행위를 하는 코드를 간단하게 작성할 수 있다.
사실은 좀 다르다
처음 이 구조를 이해하는데 많이 어려웠다.
Reactive Streams 에 대해 먼저 확인하지 않고 WebFlux 에 뛰어들며 많은 시행착오를 거쳤고, 이번 기회를 통해 기초적인 부분들에 대해서 이해하고 작성했던 코드들이 정확히 왜 그렇게 동작했는지 확인할 수 있는 기회였다 생각한다.
이 글에서 다루지 못한 내용이 너무 많다.
다음 글에서 project reactor 를 기반으로 reactive programming 에 대한 조금 더 자세한 설명 그리고 사용법에 대해 알아갈 예정이다.
Iterator pattern - Wikipedia
Obeserer pattern -Wiipedia
Reactive-streams-jvm