리액티브 스프링 #2. Reactive Stream을 직접 만들며 이해해보자!

홍지범·2023년 6월 25일
0

토비님의 유튜브 '토비의 봄TV' - 스프링 리액티브 프로그래밍을 정리한 글 입니다.

완성된 Reactive Stream의 그림

Reactive Stream을 직접 만들며 이해해보자!

  • Reactive Stream의 3가지 핵심 요소 중 하나인 Ovserver를 직접 구현해보며 어떤식으로 데이터를 주고 받는지 설명합니다.

  • 해당 예제에서는 Observable 대신 Java9의 표준이자 확장된 Observable인 Reactive Stream(Pub,Sub) 방식을 사용해 설명합니다.

  • 해당 예제에서는 연속된 Integer 요소들을 전달해주는 Pub, Sub을 만듭니다.

예제로 쓸 데이터를 정의하자!

Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
  • 먼저 예제로 전달할 데이터를 정의합니다.
    연속된 요소들을 정의하기 위해 Iterable을 사용하며 이를 DB에서 가져온 데이터라고 생각합니다.

Publisher를 만들어보자!

  Publisher pub = new Publisher() {
  	@Override
  	public void subscribe(Subscriber subscriber) {   
  	}
  };
  • Publisher는 발생한 데이터, 이벤트를 Subscriber에게 전달하는 인터페이스 입니다.

  • subscribe 메서드는 파라미터로 받은 Subscriber에게 Subscription(구독정보)를 전달해 구독을 시작합니다.

Subscriber를 만들어보자!

   Subscriber s = new Subscriber() {
       @Override
       public void onSubscribe(Subscription subscription) {
       }

       @Override
       public void onNext(Object item) {
       }

       @Override
       public void onError(Throwable throwable) {
       }

       @Override
       public void onComplete() {
       }
   };
  • Subscriber는 Publisher가 전달(Push)한 데이터, 이벤트를 받는 인터페이스 입니다.

  • 총 4개의 메서드를 구현해야 하며 아래와 같습니다.

  • onSubscribe : Pub으로부터 Subscription(구독정보)를 받아 구독을 시작하며 Sub은 Pub에게 직접 요청을 보내는 것이 아닌 Subscription을 통해 Flow를 조절합니다.(BackPressure)
    onSubscribe는 꼭 한 번 호출되어야 합니다.

  • onNext : Pub이 데이터를 전달해주면 Subs은 onNext로 데이터를 처리합니다.
    Pull방식처럼 데이터를 끌어오는것이 아니기 때문에 onNext를 했을 때 데이터가 없을 수도 있습니다.

  • onError : 어떤 에러던지 onSubscribe가 발생한 이후라면 Exception을 Object로 만들어 onError를 통해 처리할 수 있습니다.
    이 점이 Observable에서 향상된 점 입니다.(onComplete 포함)

  • onComplete : Stream은 끝나지 않을 수 있지만 끝났다면 onComplete를 통해 종료합니다.

구독을 등록해보자!

public class Main {
	public static void main(String[] args) {
	...
	pub.subscribe(sub);
	}
}
  • Pub에게 전달한 Sub은 Pub에서 onSubscribe 메서드를 통해 구독이 시작됩니다.
    Pub은 Sub에게 Subscription(구독정보)를 전달해 구독을 시작합니다.

Subcription을 만들어보자!

Publisher p = new Publisher() {
 @Override
 public void subscribe(Subscriber subscriber) {
 	subscriber.onSubscribe(new Subscription() {
     @Override
     public void request(long n) {
     }
     @Override
     public void cancel() {
     }
	 });
 }
};
  • 구독 요청을 받은 Pub은 Sub에게 Subscription(구독정보)를 전달해줍니다.

  • Sub은 Pub에게 직접 데이터를 요청하는 것이 아닌 Subscription의 request 메서드를 통해 다음 데이터를 받을수 있는 상태임을 알립니다.

  • Sub은 Pull 방식처럼 데이터를 요청하는 것이 아니라 Pub이 전달한 데이터를 받는 것 입니다.

onSubscribe를 정의해보자!

  • Subcription을 만들어보자! 를 다시 보겠습니다.
    Pub은 Sub의 onSubscribe 메서드를 통해 Subscription을 전달 했습니다.
    이제 Sub은 전달받은 Subscription의 request 메서드를 통해 Pub에게 데이터를 받을 수 있는지를 전달합니다.
    직접 데이터를 요청하는 것이 아닙니다.
    @Override
    public void onSubscribe(Subscription subscription) {
    	subscription.request(Long.MAX_VALUE);
    }

request를 정의해보자!

onNext 사용

Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
...
Publisher p = new Publisher() {
 @Override
 public void subscribe(Subscriber subscriber) {
 	//Iterable -> Iterator
 	Iterator<Integer> it = iter.iterator();
 	subscriber.onSubscribe(new Subscription() {
       int i = 0;
       @Override
       public void request(long n) {
       //n은 Sub이 데이터를 처리할 수 있는 만큼을 의미함
       		while(i < n--) {
             	if (it.hasNext()) {
                 	subscriber.onNext(it.next());
                 }
             }
       }
       @Override
       public void cancel() {
       }
	 });
 }
};
  • Subscription의 request 메서드 입니다.
    Sub이 request에 데이터를 받을 만큼(n)을 알립니다. 이에따라 빠른 Sub(Consumer)이 될 수도 있고 느린 Sub이 될 수 도 있습니다.

  • 데이터 처리를 받는 쪽에서 유연하게 처리할 수 있는 것이 Reactive Streams의 장점입니다.(나중에 자세하게 알아봅시다.)


    onComplete 정의
       int i = 0;
       @Override
       public void request(long n) {
       		while(i < n--) {
       			if (it.hasNext()) {
                	subscriber.onNext(it.next());
                } else {
                 	subscriber.onComplete();
                    break;
                }
          }
  • 데이터가 모두 넘어가면 onComplete 메서드로 구독을 중단할 수 있습니다.


    onError 정의
       int i = 0;
       @Override
       public void request(long n) {
       		try {
              while(i < n--) {
                  if (it.hasNext()) {
                      subscriber.onNext(it.next());
                  } else {
                      subscriber.onComplete();
                      break;
                  }
            } catch (Throwable t) {
            	subscriber.onError(t);
            }
          }
  • 데이터가 발생한 쪽에서 생긴 오류는 onError를 통해 Sub에게 넘겨 유연하게 처리할 수 있습니다.

정리

  • 이제 Pub은 세가지 메서드를 통해 Sub에게 데이터, 이벤트를 전달합니다.(Push 방식)

  • 지금까지 완성한 모습은 Reactive Streams의 3대 핵심인 Observable이 내부적으로 동작하는 모습 입니다.

  • 실제 비지니스에서는 더 잘만들어진 구현체를 사용하겠지만 Push 방식이 내부적으로 동작하는 원리를 이해해야 Reactive Streams를 사용하는 구조를 더 잘 쓸 수 있습니다.

  • PubSub 구조의 장점은 오래 걸리는 로직을 blocking 없이 별도의 스레드에서 준비해 데이터를 전달 받음으로서 대기시간을 줄일 수 있는 점 입니다.(이 부분은 아직 난해할테니 이후에 더 자세히 알아보겠습니다.
profile
왜? 다음 어떻게?

0개의 댓글