🌱 토비의 봄 TV 스프링 리액티브 프로그래밍을 시청한 후 학습한 내용을 정리하고 기록하기 위해 작성하는 포스팅입니다.
리액티브 프로그래밍이란 기존 프로그래밍에서 다양하게 사용하던, 필요한 데이터가 있으면 데이터를 consume하는 쪽에서 함수를 call해서 데이터를 pull
해오던 방식을 어떤 변화(이벤트)가 있었을 때 데이터를 produce하는 쪽에서 push
하는 방식으로 변화시킨 프로그래밍 기법이다. 여기까지만 이야기하면, 그게 흔히 다양한 언어에서 지원하는 이벤트 드리븐 아키텍처
랑 같은 것으로 인식될 수 있지만, 리액티브 프로그래밍은 그것보다 좀 더 많은 개념을 담고 있다.
리액티브 프로그래밍을 이야기할 때 빼놓을 수 없는 선제 개념이 이 옵저버 패턴이다. 옵저버 패턴은, 이름 그대로 어떤 변화가 있는지를 Observe(관찰)하는 패턴입이다. Oberserver Pattern에서는 Observer가 관심을 가지는 데이터(Source = Observable)가 있고, 이 Observable에 Observer를 추가해준 후, Observable이 변화가 생겼을 때 이를 감지하고 Obeserver에서 notify하는 형식으로 동작한다.
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ObserverPattern {
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for(int i = 1; i <= 10; i++){
setChanged();
//push
notifyObservers(i);
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
// iterable과 달리 별개의 쓰레드에서 동작하는 코드를 손쉽게 작성 가능
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
}
}
그러나 이런 기존의 Observer-Observable 패턴의 경우 1) 데이터를 다 줬을 때 Complete의 개념이 없고, 2) 에러 핸들링에 대한 고민이 없다는 문제점이 있었다. 이런 문제를 해결하기 위해 Publisher/Subscriber(Pub-Sub) 패턴이 등장했다.
Pub/Sub Pattern도 이름만 Observer -> Subscriber, Observable -> Publisher로 바꼈을 뿐, 기본적인 컨셉은 동일하다.
1. Publisher에 Subscriber를 등록한다.
2. 1을 위해 Publisher 인터페이스는 기본적으로 subscribe 메서드를 구현하도록 정의되어있다. 그리고 이 subscribe 메서드에서 Subscriber 객체를 인자로 받아서 onSubscribe 메서드를 통해 구독 객체를 넘겨준다.
3. 이 구독객체가 결국 그림에서 보듯, pub/sub 간의 매개 역할을 하는 객체이다. 그런데 Subscription도 인터페이스이기 때문에, request와 cancel 메서드를 직접 구현해서 어떤 동작을 할지 정의해준 후 익명 클래스의 인스턴스 형태로 넘겨준다.
4. Subscriber 인터페이스도 구현해서 onSubscribe, onNext, onError, onComplete 메서드를 오버라이딩해준다.
이를 코드로 나타내면 아래와 같다.
import static java.util.concurrent.Flow.*;
public class PubSub {
public static void main(String[] args) throws InterruptedException {
// DB에서 가져온 Collection 데이터
Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
ExecutorService es = Executors.newSingleThreadExecutor();
Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
// Future<?> f = es.submit()으로도 가능
// 결과를 받아서 거기에 따라 cancel하거나 할 수 있음
es.execute(() -> {
int i = 0;
try {
while(i++ < n){
if (it.hasNext()){
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
} catch(RuntimeException e){
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println(Thread.currentThread().getName() + " onSubscribe");
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " onNext: " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " onComplete");
}
};
p.subscribe(s);
es.awaitTermination(1, TimeUnit.MINUTES);
es.shutdown();
}
}
Subscription 객체를 통해 가능한 것
: subscription 객체는 pub-sub를 중계해주는 역할을 한다. 그리고 이 객체를 통해 subscriber가 publisher에게 요청을 하는 것도 가능하다. 이 요청을back pressure(역압)
이라고 한다.
역압은 pub-sub의 속도 차이가 발생할 때, 받는 측의 능력치에 맞게 보내는 양을 조절하기 위해 메세지를 보내는 것을 의미한다. (위의 코드에서는 long n이라는 변수를 통해 한 번에 얼만큼의 데이터를 보냈으면 하는지 sub 쪽에서 정해줄 수 있다.)
Back Pressure가 없다면?
받는 측의 상황에 따라 보내는 속도를 개선해줄 수 없다면, 버퍼가 매우매우 커야한다. 보내는 메세지들이 버퍼에 쌓이는데, 이게 넘쳐버리면 데이터가 유실될 수 있기 때문이다. 실제로 Webflux를 사용하는 것으로 유명한 기업인 넷플릭스에서도 이걸 적용하기 전에는 사용률에 따라 메모리가 peek를 찍었다가 확 감소하는 현상이 매우 잦았는데, 리액티브 프로그래밍을 도입하며 이런 문제를 해결할 수 있었다고 한다.
onComplete이 있기 때문에, pub가 sub에게 더 이상 줄 데이터가 없을 때 이를 알려줄 수 있다. 또한 onError가 있기 때문에, subscriber 쪽에서는 try/catch를 할 필요가 없고, subscribe 이후 발생하는 에러는 onError를 타고 넘어와서 우아하게(?) 처리해줄 수 있다.
이런 리액티브 패턴은 비단 한 서버 속의 쓰레드 사이에서만 쓰이는 것이 아니다. 서버와 서버 사이에서, 혹은 서버와 디비 사이에서 데이터를 가져올 때에도 리액티브를 적용하면 상황에 따라 성능을 개선해줄 수 있다.
그렇지만 Subscriber는 데이터를 sequential하게 전달받는다. 여러 쓰레드가 동작하고 있지만, 한 순간에는 한 쓰레드로부터의 데이터만 받아오게 된다.