토비님의 유튜브 '토비의 봄TV' - 스프링 리액티브 프로그래밍을 정리한 글 입니다.
완성된 Reactive Stream의 그림
- Reactive Stream의 3가지 핵심 요소 중 하나인 Ovserver를 직접 구현해보며 어떤식으로 데이터를 주고 받는지 설명합니다.
- 해당 예제에서는 Observable 대신 Java9의 표준이자 확장된 Observable인 Reactive Stream(Pub,Sub) 방식을 사용해 설명합니다.
- 해당 예제에서는 연속된 Integer 요소들을 전달해주는 Pub, Sub을 만듭니다.
Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
- 먼저 예제로 전달할 데이터를 정의합니다.
연속된 요소들을 정의하기 위해 Iterable을 사용하며 이를 DB에서 가져온 데이터라고 생각합니다.
Publisher pub = new Publisher() { @Override public void subscribe(Subscriber subscriber) { } };
- Publisher는 발생한 데이터, 이벤트를 Subscriber에게 전달하는 인터페이스 입니다.
- subscribe 메서드는 파라미터로 받은 Subscriber에게 Subscription(구독정보)를 전달해 구독을 시작합니다.
Subscriber s = new Subscriber() { @Override public void onSubscribe(Subscription subscription) { } @Override public void onNext(Object item) { } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } };
- Subscriber는 Publisher가 전달(Push)한 데이터, 이벤트를 받는 인터페이스 입니다.
- 총 4개의 메서드를 구현해야 하며 아래와 같습니다.
- onSubscribe : Pub으로부터 Subscription(구독정보)를 받아 구독을 시작하며 Sub은 Pub에게 직접 요청을 보내는 것이 아닌 Subscription을 통해 Flow를 조절합니다.(BackPressure)
onSubscribe는 꼭 한 번 호출되어야 합니다.
- onNext : Pub이 데이터를 전달해주면 Subs은 onNext로 데이터를 처리합니다.
Pull방식처럼 데이터를 끌어오는것이 아니기 때문에 onNext를 했을 때 데이터가 없을 수도 있습니다.
- onError : 어떤 에러던지 onSubscribe가 발생한 이후라면 Exception을 Object로 만들어 onError를 통해 처리할 수 있습니다.
이 점이 Observable에서 향상된 점 입니다.(onComplete 포함)
- onComplete : Stream은 끝나지 않을 수 있지만 끝났다면 onComplete를 통해 종료합니다.
public class Main { public static void main(String[] args) { ... pub.subscribe(sub); } }
- Pub에게 전달한 Sub은 Pub에서 onSubscribe 메서드를 통해 구독이 시작됩니다.
Pub은 Sub에게 Subscription(구독정보)를 전달해 구독을 시작합니다.
Publisher p = new Publisher() { @Override public void subscribe(Subscriber subscriber) { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { } @Override public void cancel() { } }); } };
- 구독 요청을 받은 Pub은 Sub에게 Subscription(구독정보)를 전달해줍니다.
- Sub은 Pub에게 직접 데이터를 요청하는 것이 아닌 Subscription의 request 메서드를 통해 다음 데이터를 받을수 있는 상태임을 알립니다.
- Sub은 Pull 방식처럼 데이터를 요청하는 것이 아니라 Pub이 전달한 데이터를 받는 것 입니다.
- Subcription을 만들어보자! 를 다시 보겠습니다.
Pub은 Sub의 onSubscribe 메서드를 통해 Subscription을 전달 했습니다.
이제 Sub은 전달받은 Subscription의 request 메서드를 통해 Pub에게 데이터를 받을 수 있는지를 전달합니다.
직접 데이터를 요청하는 것이 아닙니다.@Override public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); }
onNext 사용
Iterable<Integer> iter = Arrays.asList(1,2,3,4,5); ... Publisher p = new Publisher() { @Override public void subscribe(Subscriber subscriber) { //Iterable -> Iterator Iterator<Integer> it = iter.iterator(); subscriber.onSubscribe(new Subscription() { int i = 0; @Override public void request(long n) { //n은 Sub이 데이터를 처리할 수 있는 만큼을 의미함 while(i < n--) { if (it.hasNext()) { subscriber.onNext(it.next()); } } } @Override public void cancel() { } }); } };
- Subscription의 request 메서드 입니다.
Sub이 request에 데이터를 받을 만큼(n)을 알립니다. 이에따라 빠른 Sub(Consumer)이 될 수도 있고 느린 Sub이 될 수 도 있습니다.
- 데이터 처리를 받는 쪽에서 유연하게 처리할 수 있는 것이 Reactive Streams의 장점입니다.(나중에 자세하게 알아봅시다.)
onComplete 정의int i = 0; @Override public void request(long n) { while(i < n--) { if (it.hasNext()) { subscriber.onNext(it.next()); } else { subscriber.onComplete(); break; } }
- 데이터가 모두 넘어가면 onComplete 메서드로 구독을 중단할 수 있습니다.
onError 정의int i = 0; @Override public void request(long n) { try { while(i < n--) { if (it.hasNext()) { subscriber.onNext(it.next()); } else { subscriber.onComplete(); break; } } catch (Throwable t) { subscriber.onError(t); } }
- 데이터가 발생한 쪽에서 생긴 오류는 onError를 통해 Sub에게 넘겨 유연하게 처리할 수 있습니다.
- 이제 Pub은 세가지 메서드를 통해 Sub에게 데이터, 이벤트를 전달합니다.(Push 방식)
- 지금까지 완성한 모습은 Reactive Streams의 3대 핵심인 Observable이 내부적으로 동작하는 모습 입니다.
- 실제 비지니스에서는 더 잘만들어진 구현체를 사용하겠지만 Push 방식이 내부적으로 동작하는 원리를 이해해야 Reactive Streams를 사용하는 구조를 더 잘 쓸 수 있습니다.
- PubSub 구조의 장점은 오래 걸리는 로직을 blocking 없이 별도의 스레드에서 준비해 데이터를 전달 받음으로서 대기시간을 줄일 수 있는 점 입니다.(이 부분은 아직 난해할테니 이후에 더 자세히 알아보겠습니다.