
Reactive Streams는 비동기 및 반응형 스트림을 처리하기 위한 표준을 제공하는 라이브러리입니다.
여기서는 Reactive Streams를 사용하여 Publisher, Subscriber, Subscription을 직접 구현하고, 데이터를 발행하고 소비하는 과정에 대해 살펴보겠습니다.
이 과정에서는 백프레셔(BackPressure) 관리와 데이터 요청/발행에 대한 제어도 다룹니다.
Publisher는 데이터를 발행하는 역할을 합니다.
Publisher는 데이터를 제공하고, Subscriber로부터 구독 요청을 받습니다.
Publisher는 subscribe() 메서드를 통해 Subscriber에게 데이터를 구독할 수 있게 합니다.
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.Arrays;
import java.util.List;
public class MyPub implements Publisher<Integer> {
private final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
MySubscription subscription = new MySubscription(subscriber, data);
subscriber.onSubscribe(subscription); // 구독을 시작
}
}
Publisher는 subscribe() 메서드를 통해 Subscriber에게 구독 요청을 처리합니다.
이 메서드는 Subscription 객체를 생성하고, 이를 Subscriber에 전달하여 구독을 시작합니다.
Subscriber는 Publisher로부터 데이터를 받아 처리하는 역할을 합니다.
Subscriber는 onSubscribe(), onNext(), onError(), onComplete() 메서드를 구현해야 합니다
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class MySub implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 데이터를 1개 요청
}
@Override
public void onNext(Integer integer) {
System.out.println("Data: " + integer); // 데이터를 처리
subscription.request(1); // 다음 데이터를 요청
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done"); // 데이터 스트림이 끝났을 때
}
}
Subscription은 Publisher와 Subscriber 간의 연결을 관리합니다.
데이터 발행 및 요청을 처리하며, 구독 취소도 가능합니다.
import org.reactivestreams.Subscription;
import java.util.Iterator;
import java.util.List;
public class MySubscription implements Subscription {
private final Subscriber<? super Integer> subscriber;
private final Iterator<Integer> iterator;
public MySubscription(Subscriber<? super Integer> subscriber, List<Integer> data) {
this.subscriber = subscriber;
this.iterator = data.iterator();
}
@Override
public void request(long n) {
while (n > 0 && iterator.hasNext()) {
subscriber.onNext(iterator.next()); // 데이터를 발행
n--;
}
if (!iterator.hasNext()) {
subscriber.onComplete(); // 데이터가 더 없으면 완료 처리
}
}
@Override
public void cancel() {
System.out.println("Subscription cancelled"); // 구독 취소 처리
}
}
Publisher와 Subscriber를 연결하고, 데이터를 발행하고 소비하는 실행 예시입니다.
public class WebfluxApplication {
public static void main(String[] args) {
MyPub publisher = new MyPub();
MySub subscriber = new MySub();
publisher.subscribe(subscriber); // 구독 시작
}
}
위의 WebfluxApplication 클래스에서는 MyPub과 MySub를 생성하고, subscribe() 메서드를 호출하여 구독을 시작합니다.
데이터를 하나씩 요청하고 처리하는 방식으로 실행됩니다.
백프레셔는 Subscriber가 데이터를 얼마나 처리할 수 있는지에 대한 제어를 제공합니다.
데이터 요청을 할 때, 한 번에 처리할 수 있는 양을 조절할 수 있습니다. 이를 통해 시스템이 과부하되지 않도록 합니다.
다음은 백프레셔를 고려하여 데이터 버퍼 크기를 설정하고, 버퍼가 차면 추가 데이터를 요청하는 방식입니다.
public class MySub implements Subscriber<Integer> {
private Subscription subscription;
private int bufferSize = 2; // 버퍼 크기
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(bufferSize); // 처음에는 버퍼 크기만큼 요청
}
@Override
public void onNext(Integer integer) {
System.out.println("Data: " + integer);
if (--bufferSize <= 0) {
bufferSize = 2; // 버퍼 크기 초기화
subscription.request(bufferSize); // 추가 데이터를 요청
}
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
onNext() 메서드에서는 데이터를 받을 때마다 버퍼의 크기를 감소시키고, 버퍼 크기가 0이 되면 새로운 데이터를 요청합니다.
이번 예제를 통해 Reactive Streams의 핵심 개념인 Publisher, Subscriber, Subscription의 역할과 데이터를 처리하는 방식에 대해 이해할 수 있었습니다.
또한, 백프레셔를 사용하여 데이터 처리 흐름을 관리하는 방법도 배웠습니다.
이와 같은 방식으로 비동기 데이터 스트림을 관리하며 효율적으로 처리할 수 있습니다.
Reactive Streams는 비동기 스트리밍 처리와 백프레셔 관리, 데이터 흐름 제어를 제공하여 성능과 효율성을 개선할 수 있습니다.
다양한 응용 프로그램에서 이를 활용하면 보다 확장성 있는 시스템을 구현할 수 있습니다.