Reactive Streams와 백프레셔 관리: Publisher, Subscriber, Subscription 이해하기

궁금하면 500원·2024년 11월 5일

미생의 스프링

목록 보기
20/48

Reactive Streams는 비동기 및 반응형 스트림을 처리하기 위한 표준을 제공하는 라이브러리입니다.

여기서는 Reactive Streams를 사용하여 Publisher, Subscriber, Subscription을 직접 구현하고, 데이터를 발행하고 소비하는 과정에 대해 살펴보겠습니다.

이 과정에서는 백프레셔(BackPressure) 관리와 데이터 요청/발행에 대한 제어도 다룹니다.

1. Publisher(발행자) 만들기

Publisher는 데이터를 발행하는 역할을 합니다.

Publisher는 데이터를 제공하고, Subscriber로부터 구독 요청을 받습니다.

Publisher는 subscribe() 메서드를 통해 Subscriber에게 데이터를 구독할 수 있게 합니다.

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.Arrays;
import java.util.List;

public class MyPub implements Publisher<Integer> {

    private final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        MySubscription subscription = new MySubscription(subscriber, data);
        subscriber.onSubscribe(subscription); // 구독을 시작
    }
}

Publisher는 subscribe() 메서드를 통해 Subscriber에게 구독 요청을 처리합니다.

이 메서드는 Subscription 객체를 생성하고, 이를 Subscriber에 전달하여 구독을 시작합니다.

2. Subscriber(구독자) 만들기

Subscriber는 Publisher로부터 데이터를 받아 처리하는 역할을 합니다.

Subscriber는 onSubscribe(), onNext(), onError(), onComplete() 메서드를 구현해야 합니다

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MySub implements Subscriber<Integer> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 데이터를 1개 요청
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("Data: " + integer); // 데이터를 처리
        subscription.request(1); // 다음 데이터를 요청
    }

    @Override
    public void onError(Throwable t) {
        System.err.println("Error: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done"); // 데이터 스트림이 끝났을 때
    }
}
  • onSubscribe() 메서드에서 Subscription 객체를 받으면, 데이터를 요청할 수 있습니다.
  • onNext() 메서드에서는 데이터를 처리하고, 처리된 후에는 더 많은 데이터를 요청합니다.
  • onComplete()는 모든 데이터의 발행이 완료되었을 때 호출됩니다.
  • onError()는 오류가 발생했을 때 호출됩니다.

3. Subscription(구독) 만들기

SubscriptionPublisherSubscriber 간의 연결을 관리합니다.
데이터 발행 및 요청을 처리하며, 구독 취소도 가능합니다.

import org.reactivestreams.Subscription;

import java.util.Iterator;
import java.util.List;

public class MySubscription implements Subscription {

    private final Subscriber<? super Integer> subscriber;
    private final Iterator<Integer> iterator;

    public MySubscription(Subscriber<? super Integer> subscriber, List<Integer> data) {
        this.subscriber = subscriber;
        this.iterator = data.iterator();
    }

    @Override
    public void request(long n) {
        while (n > 0 && iterator.hasNext()) {
            subscriber.onNext(iterator.next()); // 데이터를 발행
            n--;
        }
        if (!iterator.hasNext()) {
            subscriber.onComplete(); // 데이터가 더 없으면 완료 처리
        }
    }

    @Override
    public void cancel() {
        System.out.println("Subscription cancelled"); // 구독 취소 처리
    }
}
  • request()Subscriber가 요청한 데이터 수 만큼 데이터를 발행합니다.
  • cancel()은 구독을 취소할 때 호출됩니다.
  • request() 메서드에서 데이터를 발행하고, 데이터가 없으면 onComplete()를 호출하여 스트림 종료를 알립니다.

4. 예시 실행

Publisher와 Subscriber를 연결하고, 데이터를 발행하고 소비하는 실행 예시입니다.

public class WebfluxApplication {
    public static void main(String[] args) {
        MyPub publisher = new MyPub();
        MySub subscriber = new MySub();
        publisher.subscribe(subscriber); // 구독 시작
    }
}

위의 WebfluxApplication 클래스에서는 MyPubMySub를 생성하고, subscribe() 메서드를 호출하여 구독을 시작합니다.
데이터를 하나씩 요청하고 처리하는 방식으로 실행됩니다.

5. 백프레셔(BackPressure) 관리

백프레셔는 Subscriber가 데이터를 얼마나 처리할 수 있는지에 대한 제어를 제공합니다.

데이터 요청을 할 때, 한 번에 처리할 수 있는 양을 조절할 수 있습니다. 이를 통해 시스템이 과부하되지 않도록 합니다.

다음은 백프레셔를 고려하여 데이터 버퍼 크기를 설정하고, 버퍼가 차면 추가 데이터를 요청하는 방식입니다.

public class MySub implements Subscriber<Integer> {

    private Subscription subscription;
    private int bufferSize = 2; // 버퍼 크기

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(bufferSize); // 처음에는 버퍼 크기만큼 요청
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("Data: " + integer);
        if (--bufferSize <= 0) {
            bufferSize = 2; // 버퍼 크기 초기화
            subscription.request(bufferSize); // 추가 데이터를 요청
        }
    }

    @Override
    public void onError(Throwable t) {
        System.err.println("Error: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
}

onNext() 메서드에서는 데이터를 받을 때마다 버퍼의 크기를 감소시키고, 버퍼 크기가 0이 되면 새로운 데이터를 요청합니다.

결론

이번 예제를 통해 Reactive Streams의 핵심 개념인 Publisher, Subscriber, Subscription의 역할과 데이터를 처리하는 방식에 대해 이해할 수 있었습니다.

또한, 백프레셔를 사용하여 데이터 처리 흐름을 관리하는 방법도 배웠습니다.

이와 같은 방식으로 비동기 데이터 스트림을 관리하며 효율적으로 처리할 수 있습니다.

Reactive Streams는 비동기 스트리밍 처리와 백프레셔 관리, 데이터 흐름 제어를 제공하여 성능과 효율성을 개선할 수 있습니다.

다양한 응용 프로그램에서 이를 활용하면 보다 확장성 있는 시스템을 구현할 수 있습니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글