callbacks
), 프로미스(promises
) 또는 다른 비동기 프로그래밍 구성 요소를 사용하여 이루어진다자바에서는 리액티브 스트림 API가 java.util.concurrent.Flow
패키지의 일부로 구현되어 있으며, 네 가지 주요 인터페이스가 포함되어 있다:
Publisher
: 데이터의 원천으로, 구독자에게 항목을 발행Subscriber
: 데이터 소비자로, 발행자에게 구독하고 발행된 항목을 처리Subscription
: 발행자와 구독자 간의 연결을 나타내며, 구독자가 더 많은 항목을 요청하거나 구독을 취소할 수 있게 함Processor
: 발행자와 구독자의 기능을 모두 수행할 수 있다. 주로 첫 발행자와 최종 구독자 사이에서 중간 로직을 처리하기 위해 사용된다 자바에서 리액티브 스트림을 사용하려면 Project Reactor
또는 RxJava
와 같은 인기 라이브러리를 사용할 수 있으며, 이러한 라이브러리는 리액티브 데이터 스트림을 처리하는 데 사용되는 고수준 추상화 및 유틸리티 메소드를 제공한다
Reactor
의 Flux
를 사용한 간단한 예시:
public class ReactiveExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 10); // 1부터 10까지의 숫자 스트림 생성
numbers.subscribe(System.out::println); // 스트림에 구독하고 각 숫자를 출력
}
}
numbers.subscribe()
는 구독자 또는 콜백 집합(예: System.out::println
)을 사용하여 데이터 스트림 수명 주기의 다양한 측면을 처리하는 메서드이다.
numbers.subscribe(System.out::println)
를 호출하면 System.out.println
에 대한 메서드 참조를 전달한다. 이는 onNext()
메서드를 System.out.println으로 설정한 구독자를 정의하는 것과 동일하다. onNext()
메서드는 발행자가 새 항목을 내보낼 때마다 호출된다.
다음은 내부에서 발생하는 동작이다:
subscribe
를 호출하면 제공된 콜백(이 경우 onNext
를 위한 System.out::println
)을 사용하여 구독자 객체가 생성된다.Flux
인스턴스)에 구독한다.onSubscribe
메서드에 전달된다.request(n)
을 호출하여 발행자로부터 항목을 요청한다. 여기서 n
은 처리할 준비가 된 항목 수이다. 기본적으로 Reactor
의 Flux
는 제한 없는 요청 전략을 사용하여 Long.MAX_VALUE
항목을 요청한다.onNext
메서드(이 경우 System.out::println
)를 호출한다. 이로 인해 숫자가 콘솔에 출력된다.onComplete
메서드를 호출하여 스트림이 끝났음을 알린다. 처리 중 오류가 발생하면 발행자는 관련 예외와 함께 구독자의 onError
메서드를 호출한다.import java.util.concurrent.Flow.*;
import java.util.stream.IntStream;
public class ReactiveExample {
public static void main(String[] args) {
// 발행자 생성
CustomPublisher publisher = new CustomPublisher(IntStream.range(1, 11));
// 구독자 생성
CustomSubscriber subscriber = new CustomSubscriber();
// 구독자를 발행자에 구독
publisher.subscribe(subscriber);
}
}
class CustomPublisher implements Publisher<Integer> {
private final IntStream data;
CustomPublisher(IntStream data) {
this.data = data;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
// 구독을 생성해서 구독자에 전달
CustomSubscription subscription = new CustomSubscription(subscriber, data.iterator());
subscriber.onSubscribe(subscription);
}
}
class CustomSubscription implements Subscription {
private final Subscriber<? super Integer> subscriber;
private final IntStream.IntIterator dataIterator;
CustomSubscription(Subscriber<? super Integer> subscriber, IntStream.IntIterator dataIterator) {
this.subscriber = subscriber;
this.dataIterator = dataIterator;
}
@Override
public void request(long n) {
try {
while (n-- > 0 && dataIterator.hasNext()) {
subscriber.onNext(dataIterator.nextInt());
}
if (!dataIterator.hasNext()) {
subscriber.onComplete();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
@Override
public void cancel() {
// 취소 로직
}
}
class CustomSubscriber implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 첫 아이템 요청
}
@Override
public void onNext(Integer item) {
System.out.println(item);
subscription.request(1); // 다음 아이템 요청
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Stream completed");
}
}
이 예제에서는: