Reactive Streams

Reactive Streams란?

  • 리액티브 프로그래밍을 위한 자바 진영의 표준 명세(Specification)즉 인터페이스의 모음
  • 논 블로킹 백 프레셔를 가진 비동기 스트리밍 프로세싱에 대한 표준이라고 할 수도 있다.

Stream Processing

  • 스트림 프로세싱은 데이터 스트림이나 이벤트 시퀀스를 컴퓨팅의 중심으로 보는 프로그래밍 패러다임을 말한다.
  • 스트림 프로세싱은 Out Of Memory 에러와 대량의 Garbage Collection에서 애플리케이션을 보호하는 역할을 한다. (기존 방식처럼 요청과 그에 따른 데이터를 전부 메모리에 올리는 대신 스트림을 이용해 지금 당장 처리할 데이터만 메모리에 올리는 식으로 이루어진다)
  • 병렬처리에 강점을 가지고 있다

Back Pressure

  • 이벤트를 보내는 쪽에서가 아니라 받고 처리하는 쪽에서 먼저 요청(Pull)하는 방식
  • 퍼블리셔와 구독자의 연산속도 차이에 의한 CPU, 메모리, 네트워크의 낭비와 Out Of Memory 에러 발생을 방지 ⇒ Input의 양이 Output 생성속도를 크게 웃도는 환경을 방지해 병렬 처리에 유리한 상황을 만든다
  • 이와같은 작업을 논 블로킹비동기 방식으로 처리하는 것이 Reactive Streams의 목적

스트림 프로세싱과 Back Pressure의 공통점은 스트림을 다룰 때

Input의 양이 Output 생성속도를 크게 웃도는 환경을 방지해 병렬 처리에 유리한 상황을 만드는 데 있다.

따라서 이와같은 작업을 지난 글에서 알아본 대로 논 블로킹비동기 방식으로 처리하는 것이

리액티브 스트림즈의 목적이라 할 수 있다.

계속해서 리액티브 스트림즈를 구성하고 있는 명세, 인터페이스에 대해서 살펴보자.

Components of Reactive Streams

Pubilsher

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

퍼블리셔 인터페이스는 데이터 소스로부터 데이터를 내보내는(Emit) 역할을 한다.

  • subscribe(Subscriber<? super T> s) :  해당 메서드의 매개변수가 데이터를 요청 및 소비하는 역할을 한다.
  • Subscriber(매개변수)가 이벤트를 요청하지 않는 이상 먼저 데이터를 Emit하지 않는다.
  • My Implementation
    public class MyPublisher <T> implements Publisher<T> {
    
        Iterable<T> iterable;
    
        public MyPublisher(Iterable<T> iterable) {
            System.out.println("Publisher Created");
            this.iterable = iterable;
        }
    
        @Override
        public void subscribe(Subscriber<? super T> subscriber) {
            MySubscription<T> subscription = new MySubscription<>(subscriber, iterable);
            subscriber.onSubscribe(subscription);
        }
    }

Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

네 개의 추상 메서드를 가진 Subscriber 인터페이스는 데이터를 요청하고 소비하는 역할을 한다.

  • onSubscribe(Subscription s) : 구독 시작되는 시점에 호출. 요청할 데이터의 개수 지정 및 구독 해지 처리 가능
  • onNext(T t) : 퍼블리셔가 데이터를 Emit할 때마다 호출. 데이터를 전달받아 소비
  • onError(Throwable t) : 데이터가 Subscriber에게 전달되는 과정에서 에러가 발생할 경우 호출
  • onComplete() : Emit 과정이 종료되는 시점에 호출. Emit 이후의 처리작업 수행
  • My Implementation
    public class MySubscriber<R> implements Subscriber<R> {
    
        private final int backPressure;
        private int bufferSize;
        private Subscription subscription;
    
        public MySubscriber(int backPressure) {
            System.out.println("Subscriber Created");
            System.out.println("Backpressure : " + backPressure);
            this.backPressure = backPressure;
            bufferSize = backPressure;
        }
    
        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscriber initialize Subscription");
            this.subscription.request(backPressure);
        }
    
        @Override
        public void onNext(R data) {
            System.out.println("onNext() : " + data);
            bufferSize--;
            if (bufferSize == 0) {
                System.out.println("Buffer Cleared\n");
                bufferSize = backPressure;
                subscription.request(backPressure);
            }
    
        }
    
        @Override
        public void onError(Throwable t) {
            System.out.println("Exception Invoked on Subscription");
        }
    
        @Override
        public void onComplete() {
            System.out.println("Complete Subscription");
        }
    }

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}

이름 그대로 구독 자체를 관리하는 인터페이스이다.

  • request(long n) : 퍼블리셔에게 Emit할 데이터의 개수를 담아 요청
  • cancel() : 구독 해지, 발생시 퍼블리셔는 Emit을 멈춤
  • My Implementation
    public class MySubscription<T> implements Subscription {
        private final Subscriber<? super T> subscriber;
        private final Iterator<T> iterator;
    
        public MySubscription(Subscriber<? super T> subscriber, Iterable<T> iterable) {
            this.subscriber = subscriber;
            this.iterator = iterable.iterator();
            System.out.println("Subscription Created");
        }
    
        @Override
        public void request(long n) {
            while (n > 0){
                if (iterator.hasNext()) subscriber.onNext(iterator.next());
                else {
                    subscriber.onComplete();
                    break;
                }
                n--;
            }
        }
    
        @Override
        public void cancel() {
    
        }
    }

TestCode

public class Main {
    public static void main(String[] args) {
        MyPublisher<Integer> pub1 = new MyPublisher<>(List.of(1,2,3,4,5,6,7,8,9,10));
        MySubscriber<Integer> sub1 = new MySubscriber<>(3);
        pub1.subscribe(sub1);

        System.out.println("--------------------------------------");

        MyPublisher<String> pub2 = new MyPublisher<>(List.of("a","b","c","d","e","f","g","h","i","j"));
        MySubscriber<String> sub2 = new MySubscriber<>(3);
        pub2.subscribe(sub2);
    }
}

실행결과

Publisher Created
Subscriber Created
Backpressure : 3
Subscription Created
Subscriber initialize Subscription
onNext() : 1
onNext() : 2
onNext() : 3
Buffer Cleared

onNext() : 4
onNext() : 5
onNext() : 6
Buffer Cleared

onNext() : 7
onNext() : 8
onNext() : 9
Buffer Cleared

onNext() : 10
Complete Subscription
--------------------------------------
Publisher Created
Subscriber Created
Backpressure : 3
Subscription Created
Subscriber initialize Subscription
onNext() : a
onNext() : b
onNext() : c
Buffer Cleared

onNext() : d
onNext() : e
onNext() : f
Buffer Cleared

onNext() : g
onNext() : h
onNext() : i
Buffer Cleared

onNext() : j
Complete Subscription

Processor<T, R>

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Subscriber와 Publisher를 동시에 상속받고 있는 인터페이스이다.

두 역할을 동시에 수행하는 것이 가능하다.

publisher - processor - subscriber 간의 작업의 흐름을 표현한 그림은 다음과 같다.

Implementors of Reactive Streams

ImplementorDescription
Project ReactorSpring5의 리액티브 스택에 포함된 구현체. Spring과 가장 궁합이 잘 맞는다.
RxJava.NET 기반의 리액티브 라이브러리를 넷플릭스에서 Java로 포팅한 확장 라이브러리.
Java Flow API리액티브 스트림즈 표준 사양을 Java 안에 포함시킨 구현체. SPI(Service Provider Interface) 역할
Reactive ExtensionsRxJS, RxAndroid, RxKotlin, RxPython, RxScala 등이 있음

0개의 댓글