Reactive streams 구조

- 데이터 혹은 이벤트를 제공하는 Publisher
- 데이터 혹은 이벤트를 제공받는 Subscriber
- 데이터 흐름을 조절하는 Subscription
Publisher
- subscribe 함수를 제공.
- publisher에 다수의 subscriber 등록을 지원
- subscription을 포함하고, Subscriber가 추가되면 subscription 제공
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
Subscriber
- subscriber하는 시점에 publisher로부터 subscription을 받을 수 있는 인자 제공 (onSubscribe)
- onNext, onError, onComplete를 통해서 값이나 이벤트를 받을 수 있음
- onNext는 여러번, onError나 onComplete는 딱 한번만 호출된다.
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
Subscription
- back-pressure를 조절할 수 있는 request 함수
- Publisher가 onNext를 통해서 값을 전달하는 것을 취소할 수 있는 cancel 함수
public static interface Subscription {
public void request(long n);
public void cancel();
}
Publiser, Subscriber 연동
FixedIntPublisher
- Flow.Publisher를 구현
- 고정된 숫자의 integer를 전달하는 publisher
- 8개의 integer를 전달 후 complete
- iterator를 생성해서 subsciption을 생성하고, subscriber에게 전달
- requestCount를 세기 위해서 Result 객체 사용
public class FixedIntPublisher implements Flow.Publisher<FixedIntPublisher.Result> {
@Data
public static class Result {
private final Integer value;
private final Integer requestCount;
}
@Override
public void subscribe(Flow.Subscriber<? super Result> subscriber) {
var numbers = Collections.synchronizedList(
new ArrayList<>(List.of(1, 2, 3, 4, 5, 6, 7))
);
Iterator<Integer> iterator = numbers.iterator();
var subscription = new IntSubscription(subscriber, iterator);
subscriber.onSubscribe(subscription);
}
IntSubscription
- Flow.Subscription을 구현
- subscriber의 onNext와 subscription의 request가 동기적으로 동작하면 안 되기 때문에, executor를 이용해서 별도의 thread에서 실행
- 요청 횟수를 count에 저장하고 결과에 함께 전달
- 더이상 iterator 값이 없으면, onComplete 호출
private final Flow.Subscriber<? super Result> subscriber;
private final Iterator<Integer> numbers;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final AtomicInteger count = new AtomicInteger(1);
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
@Override
public void request(long n) {
executor.submit(() -> {
for (int i = 0; i < n; i++) {
if (numbers.hasNext()) {
int number = numbers.next();
numbers.remove();
subscriber.onNext(new Result(number, count.get()));
} else {
var isChanged = isCompleted.compareAndSet(false, true);
if (isChanged) {
executor.shutdown();
subscriber.onComplete();
isCompleted.set(true);
}
break;
}
}
count.incrementAndGet();
});
}
@Override
public void cancel() {
subscriber.onComplete();
}
RequestSubscriber
- Flow.Subscribe를 구현
- 최초 연결시 1개를 고정적으료 요청 (onSubscribe 구현 확인)
- onNext에서 count를 세고, n번째 onNext마다 request
- onNext, onComplete, onError를 받으면 로그
public class RequestNSubscriber<T> implements Flow.Subscriber<T>{
private final Integer n;
private Flow.Subscription subscription;
private int count = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(T item) {
log.info("item: {}", item);
if (count++ % n == 0) {
log.info("send request");
this.subscription.request(n);
}
}
@Override
public void onError(Throwable throwable) {
log.error("error: {}", throwable.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
Publisher, Suscriber 연동
- Publisher와 Subscriber를 생성하고, publisher에 subscriber를 subscribe()
public static void main(String[] args) {
Flow.Publisher publisher = new FixedIntPublisher();
Flow.Subscriber subscriber = new RequestNSubscriber<>(1);
publisher.subscribe(subscriber);
Thread.sleep(100);
}
n이 3인 경우
- 1개씩 처리한 후 3개를 요청
- 3개 처리하고 다시 3개 요청을 complete까지 반복

- requestCount를 확인
n이 Inter.MAX_VALUE인 경우
- 1개씩 처리하고, MAX_VALUE개 요청을 complete할 때까지 반복
- 현재는 값이 8개까지임
