Publisher - Processor - Subscriber

김카레·2019년 3월 22일
1

기본 로직

publisher (inner 클래스로 subscription 있음)에 subscriber가 등록되면
subscriber는 몇 개를 요청할지 정해서 subscription.request(n)를 호출한다.
subscription은 n개 만큼 subscriber.onNext(v)를 호출한다.

Processor의 역할

하나의 subscriber의 결과물이 다른 subscriber에 영향을 줄수 있다. 그래서 publisher와 subscriber를 둘 다 상속받아, 새로운 publisher처럼 행동하며 체인처럼 다음 subscriber를 호출한다. (예, 전체 윈도우가 리사이징되면 내부 컴포넌트들도 연쇄적으로 영향을 받는 것)

image.png

@FunctionalInterface
public static interface Publisher<T> {
 public void subscribe(Subscriber<? super T> subscriber);
}
 
public static interface Subscriber<T> {
 public void onSubscribe(Subscription subscription);
 public void onNext(T item);
 public void onError(Throwable throwable);
 public void onComplete();
}
 
public static interface Subscription {
 public void request(long n);
 public void cancel();
}
 
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

image.png

Back-Pressure

Publisher(수신자)와 Subcriber(송신자)가 있을때 Publisher가 매우 빠른 속도로 데이터를 무조건 전송하면 Subcriber는 처리못한 데이터를 메모리에 쌓아두다가 뻗는다.

image.png
그렇다고 해서 Publisher가 Subscriber가 다 처리할때까지 메모리에 들고 기다리면 Publisher가 뻗는다.

image.png
Subscriber가 요청한 수 만큼 전달하는 pull-based-backpressure

image.png

그 외 참고로 일부를 Drop하거나 마지막 것만 가져오는 등의 backpressure 처리 방법들이 있음.


참고

Java 9 Flow API example - Processor - grokonez

profile
김카레

0개의 댓글