Flow API의 Publisher, Subscriber 연동 (4/15)

세젤게으름뱅이·2025년 5월 1일

Spring Webflux

목록 보기
12/16

Reactive streams 구조

  • 데이터 혹은 이벤트를 제공하는 Publisher
  • 데이터 혹은 이벤트를 제공받는 Subscriber
  • 데이터 흐름을 조절하는 Subscription

Publisher

  • subscribe 함수를 제공.
  • publisher에 다수의 subscriber 등록을 지원
  • subscription을 포함하고, Subscriber가 추가되면 subscription 제공
@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
} 

Subscriber

  • subscriber하는 시점에 publisher로부터 subscription을 받을 수 있는 인자 제공 (onSubscribe)
  • onNext, onError, onComplete를 통해서 값이나 이벤트를 받을 수 있음
  • onNext는 여러번, onError나 onComplete는 딱 한번만 호출된다.
public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

Subscription

  • back-pressure를 조절할 수 있는 request 함수
  • Publisher가 onNext를 통해서 값을 전달하는 것을 취소할 수 있는 cancel 함수
public static interface Subscription { 
	public void request(long n); 
	public void cancel(); 
}

Publiser, Subscriber 연동

FixedIntPublisher

  • Flow.Publisher를 구현
  • 고정된 숫자의 integer를 전달하는 publisher
  • 8개의 integer를 전달 후 complete
  • iterator를 생성해서 subsciption을 생성하고, subscriber에게 전달
  • requestCount를 세기 위해서 Result 객체 사용
public class FixedIntPublisher implements Flow.Publisher<FixedIntPublisher.Result> {
    @Data
    public static class Result {
        private final Integer value;
        private final Integer requestCount;
    }
    /**
    1. subscriber를 가지고 publisher.subscribe를 호출
    2. subscription 생성
    3. subscriber의 onSubscriber를 통해 생성한 subscription 전달
    */
    @Override
    public void subscribe(Flow.Subscriber<? super Result> subscriber) {
        var numbers = Collections.synchronizedList(
                new ArrayList<>(List.of(1, 2, 3, 4, 5, 6, 7))
        );
        Iterator<Integer> iterator = numbers.iterator();
        var subscription = new IntSubscription(subscriber, iterator);  // subscription 생성
        subscriber.onSubscribe(subscription); 		// 구독자에게 subscription 제공
    }

IntSubscription

  • Flow.Subscription을 구현
  • subscriber의 onNext와 subscription의 request가 동기적으로 동작하면 안 되기 때문에, executor를 이용해서 별도의 thread에서 실행
  • 요청 횟수를 count에 저장하고 결과에 함께 전달
  • 더이상 iterator 값이 없으면, onComplete 호출
private final Flow.Subscriber<? super Result> subscriber;
private final Iterator<Integer> numbers;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final AtomicInteger count = new AtomicInteger(1);			// lock없이 증가
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
/**
	1. 발행정보는 값 + 요청횟수
    2. iterator의 값이 있다면, onNext()를 통해 Result 객체에 담긴 정보로 발행 요청
    3. count 증가
    4. 값이 더이상 없으면 onComplete() 호출
*/
@Override
public void request(long n) {
    executor.submit(() -> {
        for (int i = 0; i < n; i++) {
            if (numbers.hasNext()) {
                int number = numbers.next();
                numbers.remove();
                subscriber.onNext(new Result(number, count.get()));		// 발행 신청
            } else {
                var isChanged = isCompleted.compareAndSet(false, true);
                if (isChanged) {
                    executor.shutdown();
                    subscriber.onComplete();
                    isCompleted.set(true);
                }
                break;
            }
        }
        count.incrementAndGet();			 // count 증가
    });
}
@Override
public void cancel() {
    subscriber.onComplete();			
}

RequestSubscriber

  • Flow.Subscribe를 구현
  • 최초 연결시 1개를 고정적으료 요청 (onSubscribe 구현 확인)
  • onNext에서 count를 세고, n번째 onNext마다 request
  • onNext, onComplete, onError를 받으면 로그
public class RequestNSubscriber<T> implements Flow.Subscriber<T>{
    private final Integer n;
    private Flow.Subscription subscription;
    private int count = 0;
    /**
    	1. 최초 연결시,  onSubscribe에서 1개를 고정적으로 요청
        2. reqeust()에서 처리 후, onNext()에 발행요청
        3. onNext()에서 발행완료
        4. 조건 비교 후 다시 request(n) 요청
    */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);				// 최초 연결시 1개 요청
    }
    @Override
    public void onNext(T item) {
        log.info("item: {}", item);					// 발행완료 간주
        if (count++ % n == 0) {						// 배수 확인 후 발행 요청
            log.info("send request");
            this.subscription.request(n);
        }
    }
    @Override
    public void onError(Throwable throwable) {
        log.error("error: {}", throwable.getMessage());
    }
    @Override
    public void onComplete() {
        log.info("complete");
    }
} 

Publisher, Suscriber 연동

  • Publisher와 Subscriber를 생성하고, publisher에 subscriber를 subscribe()
public static void main(String[] args) { 
    Flow.Publisher publisher = new FixedIntPublisher(); 
    Flow.Subscriber subscriber = new RequestNSubscriber<>(1); 
    publisher.subscribe(subscriber); 
    Thread.sleep(100); 
} 

n이 3인 경우

  • 1개씩 처리한 후 3개를 요청
  • 3개 처리하고 다시 3개 요청을 complete까지 반복
  • requestCount를 확인

n이 Inter.MAX_VALUE인 경우

  • 1개씩 처리하고, MAX_VALUE개 요청을 complete할 때까지 반복
  • 현재는 값이 8개까지임
profile
🤦🏻‍♂️

0개의 댓글