Chatper02. Reactive Streams

김신영·2023년 7월 26일
0

Spring WebFlux

목록 보기
2/13
post-thumbnail
post-custom-banner

Reactive Streams?

데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양

리액티브 스트림즈 구현체

  • RxJava
  • Reactor
  • Akka Streams
  • Java 9 Flow API

⭐ 리액티브 스트림즈 구성요소

컴포넌트설명
Publisher데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
Subscriber구독한 Publisher로부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할을 한다.
SubscriptionPublisher에 요청할 데이터의 개수를 지정하고, 데이터 구독을 취소하는 역할을 한다.
ProcessorPublisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다.

image

Subscriber가 Subscription.request를 통해 왜 굳이 데이터의 요청 개수를 지정하는 걸까?

  • Publisher, Subscriber 실제로는 각각 다른 스레드에서 비동기적으로 상호작용
  • 만약 Publisher가 통지하는 속도가 Subscriber가 수신받은 데이터 처리하는 속도보다 빠르다면
    • 처리를 기다리는 데이터는 쌓이게 됨
    • 결과적으로 시스템 부하가 걸림
  • 위 문제를 방지하기 위해, Subscription.request를 통해 데이터 요청 개수를 제어한다.

Publisher

public interface Publisher<T> {
	void subscribe(Subscriber<? super T> subscriber);
}
  • Kafka에서의 Pub/Sub 모델리액티브 스트림즈에서의 Pub/Sub 모델은 의미가 조금 다르다.
  • Kafka의 경우 Publisher, Subscriber 중간에 Message Broker가 있다.

Subscriber

public interface Subscriber<T> {
	void onSubscribe(Subscription subscription);
	void onNext(T t);
	void onError(Throwable e);
	void onComplete();
}
  • onSubscribe
    • 구독 시작 시점에 Publisher로부터 Subscription 인터페이스 구현체를 전달받는 역할
  • onNext
    • Publisher가 통지한 데이터를 처리하는 역할
  • onError
    • Publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할
  • onComplete
    • Publisher가 데이터 통지를 완료했을음 알릴 때 호출되는 메서드

Subscription

public interface Subscription {
  void request(long n);
  void cancel();
}

⭐ Publisher, Subscribe 동작과정

  1. Publisher가 Subscriber 인터페이스 구현 객체를 subscribe 메서드의 파라미터로 전달
  2. Publisher 내부에서는 전달받은 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드를 호출
    • Subscriber의 구독을 의미하는 Subscription 인터페이스 구현 객체를 Subscriber에게 전달
  3. 호출된 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드에서 전달받은 Subscription 객체를 통해 전달받을 데이터의 개수를 Publisher에게 요청 (request 메서드)
  4. Publisher는 Subscriber로부터 전달받은 요청 개수만큼의 데이터를 onNext 메서드를 호출해서 Subscriber에게 전달
  5. Publisher는 통지할 데이터가 더 이상 없을 경우, onComplete 메서드를 호출해서 Subscriber에게 데이터 처리 종료를 알린다.

Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
  • Publisher, Subscriber 기능을 모두 가지고 있음

리액티브 스트림즈 관련 용어

Signal

  • Publisher와 Subscriber간에 주고받는 상호작용
  • Publisher가 Subscriber에게 보내는 Signal
    • onSubscribe
    • onNext
    • onComplete
    • onError
  • Subscriber가 Publisher에게 보내는 Signal
    • request
    • cancel

Demand

  • Subscriber가 Publisher에게 요청하는 데이터를 의미
  • Publisher가 아직 Subscriber에게 전달하지 않은 데이터

Emit

  • 데이터를 내보내다.
  • 통지, 발행, 게시, 방출

Upstream, Downstream

Sequence

  • 다양한 Operator로 데이터의 연속적인 흐름
  • 예시
    Flux
    		.just(1, 2, 3, 4, 5, 6)
    		.filter(n -> n % 2 == 0)
    		.map(n -> n * 2)
    		.subscribe(System.out::println)
    1. Flux를 통해서 데이터를 생성
    2. emit
    3. filter 메서드를 통해서 필터링
    4. map 메서드를 통해 변환

Operator

  • just
  • filet
  • map
  • etc…

Source

  • 최초에 가장 먼저 생성된 무언가

리액티브 스트림즈의 구현 규칙

reactive-streams-jvm

Reactive Streams

Publisher

  1. Publisher가 Subscriber의 onNext 메소드를 호출한 총 횟수는 Subscriber가 Subscription을 통해 요청한 총 수보다 작거나 같아야 한다.
    • Subscriber의 요청 개수보다 더 적거나 같은 개수의 onNext signal을 통해 데이터를 보낼 수 있다.
  2. Publisher는 요청받은 것보다 더 적게 onNext를 호출할 수 있다. 그리고 onComplete 또는 onError를 호출해서 Subscription를 종료해야한다.
    • 무한 스트림의 경우, 이 규칙의 예외
  3. Publisher는 실패시에 반드시 onError를 호출해야한다.
  4. Publisher가 성공적으로 완료된 경우에는 onComplete를 호출해야 한다.
  5. Publisher가 onComplete 혹은 onError를 호출한 경우, Subscriber의 Subcription은 취소된 것으로 간주되어야 한다.
  6. onError, onComplete 메서드가 한번 호출됬다면, 더이상 호출하면 안된다.
  7. Subscription 객체의 cancel 메서드를 통해 Subscription이 취소된 경우, Subscriber에 추가적인 호출을 하면 안된다

Subscriber

  1. Subscription.request(n) 메서드를 통해 데이터를 요청해야 한다. 그리고 데이터는 onNext 메소드를 통해 받는다.
    • 데이터를 언제, 얼마나 수신할 수 있는지를 결정하는 책임이 Subscriber에게 있다.
    • 리액티브 스트림즈에서는 한 번에 하나의 데이터를 요청하기 보다는, Subscriber가 처리할 수 있는 적절한 상한선만큼의 데이터 개수 요청을 권장한다.
  2. Subscriber.onComplete() 메소드나 onError() 메서드에서는 Subscription 또는 Publisher의 어떠한 메서드도 호출해서는 안된다.
    • Subscriber가 완료 / 에러 Signal을 처리하는 동안 Publisher / Subscription 과 Subscriber 간의 순환 및 경쟁 조건 (Race Condition)을 방지하기 위함이다.
  3. Subscriber.onComplete(), onError() 메소드가 호출되면 Subscription이 취소된 것으로 간주해야 한다.
  4. Subscription이 더이상 필요하지 않다면, 반드시 Subscription.cancel()를 호출해야 한다.
  5. Subscriber.onSubscribe 메서드는 한번만 호출되어야 한다.
    • 즉 Subscriber는 최대 한번만 구독할 수 있다.

Subscription

  1. Subscriber는 onNext 또는 onSubscribe 메소드 내에서 Subscription.request를 동기적으로 호출할 수 있다.
  2. Subscription이 취소된 이후에 추가적인 Subscription.request(long n)은 비작동(non-operation)해야한다.
  3. Subscription이 취소된 이후에 추가적인 Subscription.cancel()은 비작동(non-operation)해야한다.
  4. Subscription.request(long n) 메소드는 전달된 파라미터가 0보다 작거나 같은 경우 IllegalArgumentException으로 onError를 호출해야한다.
  5. Subscription.cancel()은 Publisher에게 Subscriber 메서드 호출을 중단하도록 요청 해야한다.
    • Publisher가 Subscriber에게 보내는 signal 중지하도록 요청
  6. Subscription.cancel()은 결국 해당 Subscriber에 대한 참조를 삭제하도록 Publisher에 요청 해야한다.
  7. Subscription.cancel, Subscription.request 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
  8. Subscription의 request 메서드는 무한대로 호출될 수 있어야 한다.
    • 최대 Long.MAX_VALUE개의 Demand를 지원해야 한다.
      • 무한 스트림 (Unbounded Stream)

리액티브 스트림즈 구현체

RxJava

  • Reactive Extensions
  • .NET 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅하여 만든 JVM 기반의 대표적인 리액티브 확장 라이브러리
  • 2.0부터 리액티브 스트림즈 사양을 지원
  • 1.x 버전과 2.0 이후 버전의 차이점
    • Backpressure

Reactor

  • Spring Framework 팀에 의해 주도적으로 개발된 리액티브 스트림즈의 구현체

Akka Streams

  • JVM상에서의 동시성과 분산 애플리케이션을 단순화해 주는 오픈소스 Toolkit
  • Actor Model을 적극적으로 사용하는 대표적인 기술
  • Actor들 간의 통신은 메시지를 통해서만 이루어짐
  • Actor들은 서로 독립적
    • 느슨한 결합력과 높은 응집력이 보장됨
  • 리액티브 스트림즈 구현체
    • Akka Streams
  • 최종 사용자에게 제공하는 API
    • Akka Streams API

Java Flow API

  • ❌ 리액티브 스트림즈 구현체가 아님
  • 리액티브 스트림즈의 표준 사양이 정의된 **SPI
    • Service Provider Interface

기타 Reactive Extension

ReactiveX

  • RxAndroid
  • RxKotlin
  • RxJS
profile
Hello velog!
post-custom-banner

0개의 댓글