[스프링 리액티브] Observer Pattern과 Pub/Sub Pattern

June Lee·2021년 6월 21일
2

Spring WebFlux

목록 보기
1/6
post-custom-banner

🌱 토비의 봄 TV 스프링 리액티브 프로그래밍을 시청한 후 학습한 내용을 정리하고 기록하기 위해 작성하는 포스팅입니다.

Reactive Programming

리액티브 프로그래밍이란 기존 프로그래밍에서 다양하게 사용하던, 필요한 데이터가 있으면 데이터를 consume하는 쪽에서 함수를 call해서 데이터를 pull해오던 방식을 어떤 변화(이벤트)가 있었을 때 데이터를 produce하는 쪽에서 push하는 방식으로 변화시킨 프로그래밍 기법이다. 여기까지만 이야기하면, 그게 흔히 다양한 언어에서 지원하는 이벤트 드리븐 아키텍처랑 같은 것으로 인식될 수 있지만, 리액티브 프로그래밍은 그것보다 좀 더 많은 개념을 담고 있다.

Observer Pattern

리액티브 프로그래밍을 이야기할 때 빼놓을 수 없는 선제 개념이 이 옵저버 패턴이다. 옵저버 패턴은, 이름 그대로 어떤 변화가 있는지를 Observe(관찰)하는 패턴입이다. Oberserver Pattern에서는 Observer가 관심을 가지는 데이터(Source = Observable)가 있고, 이 Observable에 Observer를 추가해준 후, Observable이 변화가 생겼을 때 이를 감지하고 Obeserver에서 notify하는 형식으로 동작한다.

import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ObserverPattern {

    static class IntObservable extends Observable implements Runnable {

        @Override
        public void run() {
            for(int i = 1; i <= 10; i++){
                setChanged();
                //push
                notifyObservers(i);
            }
        }
    }
    
	public static void main(String[] args) {
		Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(Thread.currentThread().getName() + " " + arg);
            }
        };
        
        IntObservable io = new IntObservable();
        io.addObserver(ob);

        // iterable과 달리 별개의 쓰레드에서 동작하는 코드를 손쉽게 작성 가능
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(io);

        System.out.println(Thread.currentThread().getName() + " EXIT");
        es.shutdown();

    }
}

그러나 이런 기존의 Observer-Observable 패턴의 경우 1) 데이터를 다 줬을 때 Complete의 개념이 없고, 2) 에러 핸들링에 대한 고민이 없다는 문제점이 있었다. 이런 문제를 해결하기 위해 Publisher/Subscriber(Pub-Sub) 패턴이 등장했다.

Pub/Sub Pattern

Pub/Sub Pattern도 이름만 Observer -> Subscriber, Observable -> Publisher로 바꼈을 뿐, 기본적인 컨셉은 동일하다.

1. Publisher에 Subscriber를 등록한다.
2. 1을 위해 Publisher 인터페이스는 기본적으로 subscribe 메서드를 구현하도록 정의되어있다. 그리고 이 subscribe 메서드에서 Subscriber 객체를 인자로 받아서 onSubscribe 메서드를 통해 구독 객체를 넘겨준다.
3. 이 구독객체가 결국 그림에서 보듯, pub/sub 간의 매개 역할을 하는 객체이다. 그런데 Subscription도 인터페이스이기 때문에, request와 cancel 메서드를 직접 구현해서 어떤 동작을 할지 정의해준 후 익명 클래스의 인스턴스 형태로 넘겨준다.
4. Subscriber 인터페이스도 구현해서 onSubscribe, onNext, onError, onComplete 메서드를 오버라이딩해준다.

이를 코드로 나타내면 아래와 같다.

import static java.util.concurrent.Flow.*;

public class PubSub {
    public static void main(String[] args) throws InterruptedException {

	// DB에서 가져온 Collection 데이터
        Iterable<Integer> itr = Arrays.asList(1,2,3,4,5); 
        ExecutorService es = Executors.newSingleThreadExecutor();

        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                Iterator<Integer> it = itr.iterator();

                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        // Future<?> f = es.submit()으로도 가능
                        // 결과를 받아서 거기에 따라 cancel하거나 할 수 있음
                        es.execute(() -> {
                            int i = 0;
                            try {
                                while(i++ < n){
                                    if (it.hasNext()){
                                        subscriber.onNext(it.next());
                                    } else {
                                        subscriber.onComplete();
                                        break;
                                    }
                                }
                            } catch(RuntimeException e){
                                subscriber.onError(e);
                            }
                        });
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

        Subscriber<Integer> s = new Subscriber<Integer>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                System.out.println(Thread.currentThread().getName() + " onSubscribe");
                subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println(Thread.currentThread().getName() + " onNext: " + item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println(Thread.currentThread().getName() + " onComplete");
            }
        };

        p.subscribe(s);

        es.awaitTermination(1, TimeUnit.MINUTES);
        es.shutdown();
    }
}

Subscription 객체를 통해 가능한 것
: subscription 객체는 pub-sub를 중계해주는 역할을 한다. 그리고 이 객체를 통해 subscriber가 publisher에게 요청을 하는 것도 가능하다. 이 요청을 back pressure(역압)이라고 한다.
역압은 pub-sub의 속도 차이가 발생할 때, 받는 측의 능력치에 맞게 보내는 양을 조절하기 위해 메세지를 보내는 것을 의미한다. (위의 코드에서는 long n이라는 변수를 통해 한 번에 얼만큼의 데이터를 보냈으면 하는지 sub 쪽에서 정해줄 수 있다.)

Back Pressure가 없다면?
받는 측의 상황에 따라 보내는 속도를 개선해줄 수 없다면, 버퍼가 매우매우 커야한다. 보내는 메세지들이 버퍼에 쌓이는데, 이게 넘쳐버리면 데이터가 유실될 수 있기 때문이다. 실제로 Webflux를 사용하는 것으로 유명한 기업인 넷플릭스에서도 이걸 적용하기 전에는 사용률에 따라 메모리가 peek를 찍었다가 확 감소하는 현상이 매우 잦았는데, 리액티브 프로그래밍을 도입하며 이런 문제를 해결할 수 있었다고 한다.

onComplete이 있기 때문에, pub가 sub에게 더 이상 줄 데이터가 없을 때 이를 알려줄 수 있다. 또한 onError가 있기 때문에, subscriber 쪽에서는 try/catch를 할 필요가 없고, subscribe 이후 발생하는 에러는 onError를 타고 넘어와서 우아하게(?) 처리해줄 수 있다.

이런 리액티브 패턴은 비단 한 서버 속의 쓰레드 사이에서만 쓰이는 것이 아니다. 서버와 서버 사이에서, 혹은 서버와 디비 사이에서 데이터를 가져올 때에도 리액티브를 적용하면 상황에 따라 성능을 개선해줄 수 있다.
그렇지만 Subscriber는 데이터를 sequential하게 전달받는다. 여러 쓰레드가 동작하고 있지만, 한 순간에는 한 쓰레드로부터의 데이터만 받아오게 된다.

profile
📝 dev wiki
post-custom-banner

0개의 댓글