2.1 리액티브 스트림즈란?
- 리액티브 스트림즈: 데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
2.2 리액티브 스트림즈 구성요소
리액티브 스트림즈를 통해 구현해야 되는 API 컴포넌트는 아래와 같이 존재한다.
- Publisher: 데이터를 생성하고 통지하는 역할
- Subscriber: 구독한 Publisher로부터 통지된 데이터를 전달받아 처리
- Subscription: Publisher에 요청할 데이터의 개수 지정, 데이터의 구독을 취소하는 역할
- Processor: Publisher, Subscriber 기능 모두 가짐
Subscription.request
로 시스템 부하가 커지는 것을 막기 위해 데이터 개수를 제어한다.
그 이유는 Publisher와 Subscriber는 실제로 다른 스레드에서 비동기적으로 상호작용하는데, Publisher가 통지하는 속도가 Subscriber가 처리하는 속도보다 빠르면 처리를 기다리는 데이터가 쌓이기 때문이다.!!
2.3 코드로 보는 리액티브 스트림즈 컴포넌트
2.3.1 Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
- Publisher에 Subscriber 를 파라미터로 등록하는 형태로 구독이 이루어진다.
2.3.2 Subscriber
- onSubscribe: Publisher에게 요청할 데이터 개수 지정 or 구독 해지 등 구독 시작 시점에 처리하는 역할
- onNext: Publisher가 통지한 데이터를 처리하는 역할
2.3.3 Subscription
Subscirber가 구독한 데이터 개수를 요청하거나, 데이터 요청의 취소(구독 해지) 역할을 한다.
2.3.4 Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Processor가 Subscriber, Publisher 역할을 하기에 인터페이스를 상속한다.
2.4 리액티브 스트림즈 관련 용어 정의
Signal
Publisher와 Subscriber 간에 주고받는 상호작용, 인터페이스 코드에서 볼 수 있는 onSubscribe, onNext 등과 같은 메서드를 시그널이라고 한다.
Damand
Subscriber 가 Publisher에게 요청하는 데이터, 아직 전달하지 않은 데이터 (말 그대로 수요)
Emit
Publisher -> Subscriber 에게 데이터 전달
Upstream/Downstream
public class Example2_5 {
public static void main(String[] args) {
Flux
.just(1, 2, 3, 4, 5, 6)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(System.out::println);
}
}
- Upstream: 현재 호출한 메서드에서 반환된 Flux의 위치에서 자신보다 더 상위에 있는 Flux
- Downstream: 하위에 있는 Flux
예를 들어 just
로 반환된 Flux 는 filter
로 반환된 Flux가 자신보다 하위에 있기에 Downstream
Sequence
Publisher 가 emit하는 데이터의 연속적인 흐름을 정의해 놓은 것 자체
Example2_5 에서의 다양한 Operator로 데이터의 연속적인 흐름을 정의한 것
Operator
just, filter, map 등 연산자
Source
대부분 '최초의' 라는 의미로 사용
2.5 리액티브 스트림즈의 구현 규칙
Publisher 구현을 위한 주요 기본 규칙
- Publisher가 Subscriber에게 보내는 onNext Signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
- Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출해 구독을 종료할 수 있다. (마지막 요청)
- Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다.
- Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
- Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
- 일단 종료 상태(Error, Complete)를 받으면 더 이상 Signal이 발생되지 않아야 한다.
- 구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야 한다.
Subscriber 구현을 위한 주요 기본 규칙
- Subscriber는 Publisher로부터 onNext signal을 수신하기 위해
Subscription.request(n)
를 통해 Demand signal을 Publisher에게 보내야 한다.
- onComplete, onError는 Subscriptuon 또는 Publisher의 메서드를 호출해서는 안된다. (race condition 방지를 위함)
- onComplete, onError 는 signal 을 수신한 후 구독이 취소된 것으로 간주해야 한다.
- 구독이 더 이상 필요하지 않은 경우 cancel을 호출해야 한다.
- onSubscribe()는 최대 한 번만 호출되어야 한다.
Subscription 구현을 위한 주요 기본 규칙
- 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로
Subscription.request
를 호출하도록 허용해야 한다.
- request와 onNext 사이의 상호 재귀로 인해 발생할 수 있는 stack overflow 피하기 위해 request가 다시 호출되는 것을 분명히 하는 것
- 구독이 취소되지 않은 동안
Subscription.cancel()
은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
- publisher 가 signal 전송을 중지할 뿐아니라 참조까지 삭제한다. GC가 유효하지 않은 구독자 수집해 메모리 확보할 수 있도록 해준다.
- cancel, request 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
- 메서드 내부로 예외를 던지지 않고 onError 시그널과 함께 보낸다.
- 무한 스트림 지원, 최대 2^63-1개의 Demand 지원
2.6 리액티브 스트림즈 구현체
- RxJava
- Project Reactor: spring fw 팀에 의해 개발된 구현체
- Akka Streams: Actor 모델
- Java Flow API: Java9부터 지원, SPI