Reactive Streams?
데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
리액티브 스트림즈 구현체
- RxJava
- Reactor
- Akka Streams
- Java 9 Flow API
⭐ 리액티브 스트림즈 구성요소
컴포넌트 | 설명 |
---|
Publisher | 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다. |
Subscriber | 구독한 Publisher로부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할을 한다. |
Subscription | Publisher에 요청할 데이터의 개수를 지정하고, 데이터 구독을 취소하는 역할을 한다. |
Processor | Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다. |
Subscriber가 Subscription.request를 통해 왜 굳이 데이터의 요청 개수를 지정하는 걸까?
Publisher
, Subscriber
실제로는 각각 다른 스레드에서 비동기적으로 상호작용
- 만약
Publisher
가 통지하는 속도가 Subscriber
가 수신받은 데이터 처리하는 속도보다 빠르다면
- 처리를 기다리는 데이터는 쌓이게 됨
- 결과적으로 시스템 부하가 걸림
- 위 문제를 방지하기 위해,
Subscription.request
를 통해 데이터 요청 개수를 제어한다.
Publisher
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Kafka에서의 Pub/Sub 모델
과 리액티브 스트림즈에서의 Pub/Sub 모델
은 의미가 조금 다르다.
- Kafka의 경우
Publisher
, Subscriber
중간에 Message Broker
가 있다.
Subscriber
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
- onSubscribe
- 구독 시작 시점에 Publisher로부터 Subscription 인터페이스 구현체를 전달받는 역할
- onNext
- Publisher가 통지한 데이터를 처리하는 역할
- onError
- Publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할
- onComplete
- Publisher가 데이터 통지를 완료했을음 알릴 때 호출되는 메서드
Subscription
public interface Subscription {
void request(long n);
void cancel();
}
⭐ Publisher, Subscribe 동작과정
- Publisher가 Subscriber 인터페이스 구현 객체를
subscribe
메서드의 파라미터로 전달
- Publisher 내부에서는 전달받은 Subscriber 인터페이스 구현 객체의
onSubscribe
메서드를 호출
- Subscriber의 구독을 의미하는 Subscription 인터페이스 구현 객체를 Subscriber에게 전달
- 호출된 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드에서 전달받은 Subscription 객체를 통해 전달받을 데이터의 개수를 Publisher에게 요청 (
request
메서드)
- Publisher는 Subscriber로부터 전달받은 요청 개수만큼의 데이터를
onNext
메서드를 호출해서 Subscriber에게 전달
- Publisher는 통지할 데이터가 더 이상 없을 경우,
onComplete
메서드를 호출해서 Subscriber에게 데이터 처리 종료를 알린다.
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
- Publisher, Subscriber 기능을 모두 가지고 있음
리액티브 스트림즈 관련 용어
Signal
- Publisher와 Subscriber간에 주고받는 상호작용
- Publisher가 Subscriber에게 보내는 Signal
- onSubscribe
- onNext
- onComplete
- onError
- Subscriber가 Publisher에게 보내는 Signal
Demand
- Subscriber가 Publisher에게 요청하는 데이터를 의미
- Publisher가 아직 Subscriber에게 전달하지 않은 데이터
Emit
- 데이터를 내보내다.
- 통지, 발행, 게시, 방출
Upstream, Downstream
Sequence
- 다양한 Operator로 데이터의 연속적인 흐름
- 예시
Flux
.just(1, 2, 3, 4, 5, 6)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(System.out::println)
- Flux를 통해서 데이터를 생성
- emit
- filter 메서드를 통해서 필터링
- map 메서드를 통해 변환
Operator
Source
리액티브 스트림즈의 구현 규칙
reactive-streams-jvm
Reactive Streams
Publisher
- Publisher가 Subscriber의 onNext 메소드를 호출한 총 횟수는 Subscriber가 Subscription을 통해 요청한 총 수보다 작거나 같아야 한다.
- Subscriber의 요청 개수보다 더 적거나 같은 개수의 onNext signal을 통해 데이터를 보낼 수 있다.
- Publisher는 요청받은 것보다 더 적게 onNext를 호출할 수 있다. 그리고 onComplete 또는 onError를 호출해서 Subscription를 종료해야한다.
- Publisher는 실패시에 반드시 onError를 호출해야한다.
- Publisher가 성공적으로 완료된 경우에는 onComplete를 호출해야 한다.
- Publisher가 onComplete 혹은 onError를 호출한 경우, Subscriber의 Subcription은 취소된 것으로 간주되어야 한다.
- onError, onComplete 메서드가 한번 호출됬다면, 더이상 호출하면 안된다.
- Subscription 객체의 cancel 메서드를 통해 Subscription이 취소된 경우, Subscriber에 추가적인 호출을 하면 안된다
Subscriber
- Subscription.request(n) 메서드를 통해 데이터를 요청해야 한다. 그리고 데이터는 onNext 메소드를 통해 받는다.
- 데이터를 언제, 얼마나 수신할 수 있는지를 결정하는 책임이 Subscriber에게 있다.
- 리액티브 스트림즈에서는 한 번에 하나의 데이터를 요청하기 보다는, Subscriber가 처리할 수 있는 적절한 상한선만큼의 데이터 개수 요청을 권장한다.
- Subscriber.onComplete() 메소드나 onError() 메서드에서는 Subscription 또는 Publisher의 어떠한 메서드도 호출해서는 안된다.
- Subscriber가 완료 / 에러 Signal을 처리하는 동안 Publisher / Subscription 과 Subscriber 간의 순환 및 경쟁 조건 (Race Condition)을 방지하기 위함이다.
- Subscriber.onComplete(), onError() 메소드가 호출되면 Subscription이 취소된 것으로 간주해야 한다.
- Subscription이 더이상 필요하지 않다면, 반드시 Subscription.cancel()를 호출해야 한다.
- Subscriber.onSubscribe 메서드는 한번만 호출되어야 한다.
- 즉 Subscriber는 최대 한번만 구독할 수 있다.
Subscription
- Subscriber는 onNext 또는 onSubscribe 메소드 내에서 Subscription.request를 동기적으로 호출할 수 있다.
- Subscription이 취소된 이후에 추가적인 Subscription.request(long n)은 비작동(non-operation)해야한다.
- Subscription이 취소된 이후에 추가적인 Subscription.cancel()은 비작동(non-operation)해야한다.
- Subscription.request(long n) 메소드는 전달된 파라미터가 0보다 작거나 같은 경우 IllegalArgumentException으로 onError를 호출해야한다.
- Subscription.cancel()은 Publisher에게 Subscriber 메서드 호출을 중단하도록 요청 해야한다.
- Publisher가 Subscriber에게 보내는 signal 중지하도록 요청
- Subscription.cancel()은 결국 해당 Subscriber에 대한 참조를 삭제하도록 Publisher에 요청 해야한다.
- Subscription.cancel, Subscription.request 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
- Subscription의 request 메서드는 무한대로 호출될 수 있어야 한다.
- 최대 Long.MAX_VALUE개의 Demand를 지원해야 한다.
- 무한 스트림 (Unbounded Stream)
리액티브 스트림즈 구현체
RxJava
- Reactive Extensions
- .NET 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅하여 만든 JVM 기반의 대표적인 리액티브 확장 라이브러리
- 2.0부터 리액티브 스트림즈 사양을 지원
- 1.x 버전과 2.0 이후 버전의 차이점
Reactor
- Spring Framework 팀에 의해 주도적으로 개발된 리액티브 스트림즈의 구현체
Akka Streams
- JVM상에서의 동시성과 분산 애플리케이션을 단순화해 주는 오픈소스 Toolkit
- Actor Model을 적극적으로 사용하는 대표적인 기술
- Actor들 간의 통신은 메시지를 통해서만 이루어짐
- Actor들은 서로 독립적
- 리액티브 스트림즈 구현체
- 최종 사용자에게 제공하는 API
Java Flow API
- ❌ 리액티브 스트림즈 구현체가 아님
- 리액티브 스트림즈의 표준 사양이 정의된 **SPI
- Service Provider Interface
기타 Reactive Extension
ReactiveX