Reactive streams inspection, with two design patterns

Lofri·2024년 6월 19일

WebFlux

목록 보기
2/3

Webflux 의 기반으로 사용되는 project reactor 는 Reactive Streams Spec 을 준수한다.
Project reactor 를 잘 사용하기 위해선 그 근본이 되는 Reactive streams 에 대해 이해하는 것이 중요하다.

나는 reactive streams 의 명세를 보며 가장 먼저 observer 패턴이 생각났고 back-pressure 라는 용어를 보며 정확히 어떤 것을 의미하는지 이해하기 어려웠다.

그렇기에 비슷한 동작을 하는 observer pattern 과 backpressure 를 이해하게 도와줄 수 있는 iterator pattern 을 기본으로 reactive streams 의 기본을 확인해보자 한다.

이 글은 아래 내용을 다룬다.

  • Iterator 와 Observer 패턴
  • Reactive Streams 기본

Iterator Pattern

Iterator<Integer> iterator = someIntegerList.iterator();
while (iterator.hasNext()) {
	iterator.next();
}

보통 우리가 여러개의 아이템이 제공되고 이를 핸들링하기 위해 사용하는 대표적인 방법 중 하나는 이터레이터 패턴을 사용하는 것이다.

이터레이터 패턴은 다음과 같은 특징을 가지고 있다.

  • 사용자는 데이터를 받기 위해서 next() 를 통해 데이터를 요청해야한다.
  • 사용자는 데이터를 원할 때 가져올 수 있다.
  • 컬렉션의 요소를 단 방향으로 순회할 수 있다.

이터레이터 패턴을 통해, 사용자는 주어진 아이템을 자기가 원하는 상황에 데이터를 받아올 수 있다. 만약 내가 데이터를 200개만 가져오고 싶을 때, 단순히 조건문안에 그 조건을 기술하면 된다.

데이터가 있다는 가정 하에 언제 어떻게 데이터를 가져올 지 스스로 결정 가능한 것이다.
즉 내가 원할 때에 데이터를 Pulling 할 수 있다는 특징을 가지고 있다.


Observer Pattern

someSubject.addObserver(someObserver);
someSubject.notify();
someSubject.update();

이터레이터 패턴은 사용자가 직접 아이템을 얻고 조작하는데 사용된다.
그렇다면 내가 제어할 수 없는 시점에 아이템 또는 상태의 변화를 확인하고 처리를 진행하고 싶다면 어떻게 할 수 있을까?

Observer Pattern 이 대표적인 예이다.
이터레이터 패턴과 달리 데이터를 직접 가져오지 않고 subject 가 상태의 변화를 비동기적으로 알려주는 것에 대한 처리가 가능하다.

Observer pattern 은 아래 특징을 가지고 있다.

  • subject 는 상태 변화가 발생했을 때 등록된 observer 에게 변화를 알릴 수 있다.
  • 객체는 subject 에 observer 로 등록됨으로 변화를 추후에 비동기적으로 제공받을 수 있다.

Observer Pattern 을 통해, subject 는 원하는 시점에 subscriber 에게 변화를 알릴 수 있다. 즉 데이터를 주고 싶은 시점에 데이터를 Pushing 할 수 있다는 것이다.


Reactive Streams

Iterator, Observer 패턴 모두 데이터를 제공하기 위해 사용된다는 관점에서 본다면 매우 비슷하다. 단순히 그 시점이 다를 뿐이지 아이템을 제공하기 위해 사용된다는 점은 동일하다.

Reactive Streams 은 이 두 가지 패턴을 기반으로 바라본다면 이해하기 쉽다.
리액티브 스트림은 크게 네가지 구성요소로 이뤄져있다.

  1. Publisher : 데이터 제공자
  2. Subscriber : 데이터 수취자
  3. Subscription : pub, sub 의 관계 (one to one)
  4. Processor : pub, sub 모두 지켜야할 처리 단계

이 글에선 Publisher, Subscriber 의 관계에 대해 다루려 한다.
아래 플로우 차트를 통해 pub, sub 의 동작에 대해 자세히 알아보자.


해당 내용을 풀어서 써본다면 다음과 같다.

  1. Subscriber 는 subscribe(Subscriber) 를 통해 구독을 시작한다.
  2. Publisher 는 구독한 Subscriber 에게 생성한 Subscription 객체를 onSubscribe(Subscription) 을 통해 전달한다.
  3. Subscriber 는 데이터가 필요한 경우 Subscription 에 request(int) 를 통해 데이터를 요청한다.
  4. Subscription 은 요청을 Publisher 에게 전달한다.
  5. Publisher 는 오류가 없을 경우, onNext(T) 를 통해 데이터를 전달한다.
  6. Publisher 는 오류가 있을 경우, onError(Throwable) 을 통해 에러 메시지를 전달한다.
  7. Publisher 는 모든 데이터 전송이 완료되었을 경우, onComplete() 를 통해 완료를 알린다.

그림과 설명만으로는 잘 이해가 되지 않을 수 있으니
아래와 같은 동작을 하는 예시 코드를 통해 좀 더 자세하게 알아보자.

Main

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class ExampleMain {
    public static void main(String[] args) {
        Publisher<Integer> pub = new ExamplePublisher();
        Subscriber<Integer> sub = new ExampleSubscriber();

        pub.subscribe(sub);
    }
}
/**

[Result] :
[ExamplePublisher] Create Subscription
[ExampleSubscriber] Retrieve subscription
[Subscription] 3 items requested
[ExampleSubscriber] Retrieve emitted value: 0
[ExampleSubscriber] Retrieve emitted value: 1
[ExampleSubscriber] Retrieve emitted value: 2
[Subscription] 1 items requested
[ExampleSubscriber] Retrieve emitted value: 3
[Subscription] 1 items requested
[ExampleSubscriber] Retrieve emitted value: 4
[Subscription] 1 items requested
[ExampleSubscriber] Retrieve emitted value: 5
[Subscription] 1 items requested
[Subscription] No items, call Subscriber.onComplete()
[ExampleSubscriber] onComplete called
[ExampleSubscriber] buf: [0, 1, 2, 3, 4, 5]

**/

Publisher

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExamplePublisher implements Publisher<Integer> {
    Iterable<Integer> items = Arrays.asList(0,1,2,3,4,5);
    
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {					
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Iterator<Integer> iterator = items.iterator();
        
        subscriber.onSubscribe(new Subscription() {		// (1)
            Future<?> f;
            @Override
            public void request(long n) {		// (2)
                this.f = executorService.submit(() -> {
                    long left = n;
                    try {
                        while (left > 0) {
                            if (!iterator.hasNext()) {
                                subscriber.onComplete();		// (3)
                                executorService.shutdown();
                                break;
                            }
                            subscriber.onNext(iterator.next());		// (3)
                            left--;
                        }
                    } catch (Exception e) {
                        subscriber.onError(e);		// (4)
                    }
                });
            }

            @Override
            public void cancel() {
                f.cancel(true);
            }
        });
    }
}
  1. Subscriber 가 subscribe 요청을 진행하면, Publisher 는 연결체인 Subscription 객체를 만들어 전달한다.
  2. Subscription 은 request() 요청이 올 경우 그 수에 해당하는 데이터를 제공한다.
  3. Subscription 은 Publisher 의 데이터 제공 또는 완료를 onNext(), onComplete()를 통해 Subscriber 에게 전달한다.
  4. Subscription 은 Publisher 문제에 따른 예외 처리를 onError() 를 통해 Subscriber 에게 위임한다. Delegate failures as messages

어디서 많이 본 것 같지 않은가.
3, 4 번을 보면 pub 가 sub 에게 상태 변화를 원하는 시점에 알리는 것을 확인할 수 있다. Observer pattern 이 떠오른다.

위 코드를 통해 우리는 다음과 같은 사실을 알 수 있다.

  • pub 는 원하는 시점에 데이터를 Subscriber 에서 pushing 한다.
  • pub 와 sub 는 subscription 을 통해 상호 소통한다.

Subscriber

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.List;

public class ExampleSubscriber implements Subscriber<Integer> {
    private static final String PREFIX = "[ExampleSubscriber] ";
    private static final int AVAIL_SIZE = 3;
    private Subscription subscription;
    private List<Integer> buf = new ArrayList<>();

    @Override
    public void onSubscribe(Subscription subscription) {		// (1)
        System.out.println(PREFIX + "Retrieve subscription");
        this.subscription = subscription;
        this.subscription.request(AVAIL_SIZE);		// (2)
    }

    @Override
    public void onNext(Integer integer) {
        buf.add(integer);
        System.out.println(PREFIX + "Retrieve emitted value: " + integer);
        if (buf.size() >= AVAIL_SIZE) {
            this.subscription.request(1);		// (2)
        }
    }

    @Override
    public void onError(Throwable t) {
        System.out.println(t);
    }

    @Override
    public void onComplete() {
        System.out.println(PREFIX + "onComplete called");
        System.out.println(PREFIX + "buf: " + buf);
    }
}
  1. sub 는 onSubscribe 를 통해 pub 로 부터 Subscription 객체를 받고 이를 통해 pub 에게 요청을 진행한다.
  2. sub 는 request 를 통해 pub 에게 데이터를 필요한 상황에, 필요한 양만큼 요청할 수 있다.
  3. sub 는 onNext, onError, onComplete 를 통해 pub 의 상태 변화를 알림 받을 수 있다. 상황에 따른 행위를 진행할 수 있다.

이것 또한 어디서 본 것 같지 않은가?
2번을 보면 iterator 패턴의 next 가 떠오른다.
sub 는 iterator 패턴과 마찬가지로 자신이 원하는 시점에 필요한 양만큼 pub 로 부터 데이터를 수취할 수 있다.

위 코드를 통해 우리는 다음과 같은 특징을 확인할 수 있다.

  • sub 는 데이터 요청 타이밍과 갯수를 스스로 결정할 수 있다. back-pressure
  • sub 는 구독에 대한 관리를 pub 에게 직접 하는게 아닌, Subscription 을 통해 진행한다.
  • sub 는 pub 의 상태 변화 또는 에러에 대한 처리 권한을 가진다.

구현체들

앞의 내용을 통해 Reactive Streams 의 기본 구현과 특징을 확인했다.

그렇다고 이 내용들만을 가지고 실제로 Reactive 한 프로그램을 짜는 것은 어려운 일이다. Reactive Streams 라이브러리 코드를 확인해 보면 알겠지만, 실제 구현된 기능은 많지 않다. API 와 스펙에 대한 약속과 정의만을 구현해놨기 때문이다.

실제 코드 작업은 Flow, RxJava, project-reactor 등 reactive streams 의 spec 을 구현한 라이브러리를 이용해 도움 받을 수 있다.

project-reactor 로 예를 보자면

import reactor.core.publisher.Flux;

import java.util.List;

public class TMain {

    public static void main(String[] args) {
        Flux.fromIterable(List.of(0,1,2,3,4,5))
                .doOnSubscribe(v -> System.out.println("Retrieve subscription"))
                .doOnNext(v -> System.out.println("Retrieve emitted value: " + v))
                .doOnRequest(v -> System.out.println(v + " items requested"))
                .doOnComplete(() -> System.out.println("onComplete called"))
                .collectList()
                .subscribe(System.out::println);
    }
}

코드를 통해 위에 작성한 코드와 동일한 행위를 하는 코드를 간단하게 작성할 수 있다.
사실은 좀 다르다


마무리

처음 이 구조를 이해하는데 많이 어려웠다.
Reactive Streams 에 대해 먼저 확인하지 않고 WebFlux 에 뛰어들며 많은 시행착오를 거쳤고, 이번 기회를 통해 기초적인 부분들에 대해서 이해하고 작성했던 코드들이 정확히 왜 그렇게 동작했는지 확인할 수 있는 기회였다 생각한다.

이 글에서 다루지 못한 내용이 너무 많다.
다음 글에서 project reactor 를 기반으로 reactive programming 에 대한 조금 더 자세한 설명 그리고 사용법에 대해 알아갈 예정이다.


99. References

Iterator pattern - Wikipedia
Obeserer pattern -Wiipedia
Reactive-streams-jvm

profile
Java BE

0개의 댓글