스트림 프로세싱과 Back Pressure의 공통점은 스트림을 다룰 때
Input의 양이 Output 생성속도를 크게 웃도는 환경을 방지해 병렬 처리에 유리한 상황을 만드는 데 있다.
따라서 이와같은 작업을 지난 글에서 알아본 대로 논 블로킹, 비동기 방식으로 처리하는 것이
리액티브 스트림즈의 목적이라 할 수 있다.
계속해서 리액티브 스트림즈를 구성하고 있는 명세, 인터페이스에 대해서 살펴보자.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
퍼블리셔 인터페이스는 데이터 소스로부터 데이터를 내보내는(Emit) 역할을 한다.
subscribe(Subscriber<? super T> s)
: 해당 메서드의 매개변수가 데이터를 요청 및 소비하는 역할을 한다.public class MyPublisher <T> implements Publisher<T> {
Iterable<T> iterable;
public MyPublisher(Iterable<T> iterable) {
System.out.println("Publisher Created");
this.iterable = iterable;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
MySubscription<T> subscription = new MySubscription<>(subscriber, iterable);
subscriber.onSubscribe(subscription);
}
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
네 개의 추상 메서드를 가진 Subscriber 인터페이스는 데이터를 요청하고 소비하는 역할을 한다.
onSubscribe(Subscription s)
: 구독 시작되는 시점에 호출. 요청할 데이터의 개수 지정 및 구독 해지 처리 가능onNext(T t)
: 퍼블리셔가 데이터를 Emit할 때마다 호출. 데이터를 전달받아 소비onError(Throwable t)
: 데이터가 Subscriber에게 전달되는 과정에서 에러가 발생할 경우 호출onComplete()
: Emit 과정이 종료되는 시점에 호출. Emit 이후의 처리작업 수행public class MySubscriber<R> implements Subscriber<R> {
private final int backPressure;
private int bufferSize;
private Subscription subscription;
public MySubscriber(int backPressure) {
System.out.println("Subscriber Created");
System.out.println("Backpressure : " + backPressure);
this.backPressure = backPressure;
bufferSize = backPressure;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscriber initialize Subscription");
this.subscription.request(backPressure);
}
@Override
public void onNext(R data) {
System.out.println("onNext() : " + data);
bufferSize--;
if (bufferSize == 0) {
System.out.println("Buffer Cleared\n");
bufferSize = backPressure;
subscription.request(backPressure);
}
}
@Override
public void onError(Throwable t) {
System.out.println("Exception Invoked on Subscription");
}
@Override
public void onComplete() {
System.out.println("Complete Subscription");
}
}
public interface Subscription {
public void request(long n);
public void cancel();
}
이름 그대로 구독 자체를 관리하는 인터페이스이다.
request(long n)
: 퍼블리셔에게 Emit할 데이터의 개수를 담아 요청cancel()
: 구독 해지, 발생시 퍼블리셔는 Emit을 멈춤public class MySubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private final Iterator<T> iterator;
public MySubscription(Subscriber<? super T> subscriber, Iterable<T> iterable) {
this.subscriber = subscriber;
this.iterator = iterable.iterator();
System.out.println("Subscription Created");
}
@Override
public void request(long n) {
while (n > 0){
if (iterator.hasNext()) subscriber.onNext(iterator.next());
else {
subscriber.onComplete();
break;
}
n--;
}
}
@Override
public void cancel() {
}
}
public class Main {
public static void main(String[] args) {
MyPublisher<Integer> pub1 = new MyPublisher<>(List.of(1,2,3,4,5,6,7,8,9,10));
MySubscriber<Integer> sub1 = new MySubscriber<>(3);
pub1.subscribe(sub1);
System.out.println("--------------------------------------");
MyPublisher<String> pub2 = new MyPublisher<>(List.of("a","b","c","d","e","f","g","h","i","j"));
MySubscriber<String> sub2 = new MySubscriber<>(3);
pub2.subscribe(sub2);
}
}
Publisher Created
Subscriber Created
Backpressure : 3
Subscription Created
Subscriber initialize Subscription
onNext() : 1
onNext() : 2
onNext() : 3
Buffer Cleared
onNext() : 4
onNext() : 5
onNext() : 6
Buffer Cleared
onNext() : 7
onNext() : 8
onNext() : 9
Buffer Cleared
onNext() : 10
Complete Subscription
--------------------------------------
Publisher Created
Subscriber Created
Backpressure : 3
Subscription Created
Subscriber initialize Subscription
onNext() : a
onNext() : b
onNext() : c
Buffer Cleared
onNext() : d
onNext() : e
onNext() : f
Buffer Cleared
onNext() : g
onNext() : h
onNext() : i
Buffer Cleared
onNext() : j
Complete Subscription
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Subscriber와 Publisher를 동시에 상속받고 있는 인터페이스이다.
두 역할을 동시에 수행하는 것이 가능하다.
publisher - processor - subscriber 간의 작업의 흐름을 표현한 그림은 다음과 같다.
Implementor | Description |
---|---|
Project Reactor | Spring5의 리액티브 스택에 포함된 구현체. Spring과 가장 궁합이 잘 맞는다. |
RxJava | .NET 기반의 리액티브 라이브러리를 넷플릭스에서 Java로 포팅한 확장 라이브러리. |
Java Flow API | 리액티브 스트림즈 표준 사양을 Java 안에 포함시킨 구현체. SPI(Service Provider Interface) 역할 |
Reactive Extensions | RxJS, RxAndroid, RxKotlin, RxPython, RxScala 등이 있음 |