HotPublisher & Cold Publisher (4/16)

세젤게으름뱅이·2025년 5월 1일

Spring Webflux

목록 보기
13/16

HotPublisher & Cold Publisher

HotPublisher

  • subscribe가 없더라도 데이터를 생성하고, stream에 push하는 publisher
  • 트위터 게시글 읽기, 공유 리소스 변화 등
  • 여러 subscriber에게 동일한 데이터 전달

ColdPublisher

  • subscribe가 시작되는 순간ㅂ누터 데이터를 생성하고 전송
  • 파일 읽기, 웹 API 요청 등
  • subscriber에 따라 독립적인 데이터 스트림 제공

HotPublisherMain

@Slf4j
public class SimpleHotPublisherMain {

    /**
     * 예상 시나리오
     *
     * subscriber1 = 1~50
     * subscriber2,3 = 50 ~ 100
     * subscriber4 = 110 ~ 150(대략)
     *
     * 비동기이기 때문에, subscribe한 시점부터 main스레드는 5초 대기.
     * 데이터 생성중인 publisher는 !Thread.interrupted() 할 때까지 생성
     * 5초 후에 돌아가던 싱글스레드가 종료되면서 데이터 생성 중단. (sleep 시점에 데이터 생산 중단)
     *
     * subscriber4는 subscriber2,3 cancel후 바로 subscribe되지 않고, 1000ms간 멈추기 때문에 대략 10개의 데이터가 비게 된다. (데이터 생산은 100ms 기준)
     *
     * @param args
     */
    @SneakyThrows
    public static void main(String[] args) {
        // prepare publisher
        var publisher = new SimpleHotPublisher();

        // prepare subscriber1
        var subscriber = new SimpleNamedSubscriber<>("subscriber1");
        publisher.subscribe(subscriber);

        // cancel after 5s
        Thread.sleep(5000);
        subscriber.cancel();

        // prepare subscriber2,3
        var subscriber2 = new SimpleNamedSubscriber<>("subscriber2");
        var subscriber3 = new SimpleNamedSubscriber<>("subscriber3");
        publisher.subscribe(subscriber2);
        publisher.subscribe(subscriber3);

        // cancel after 5s
        Thread.sleep(5000);
        subscriber2.cancel();
        subscriber3.cancel();


        Thread.sleep(1000);

        var subscriber4 = new SimpleNamedSubscriber<>("subscriber4");
        publisher.subscribe(subscriber4);

        // cancel after 5s
        Thread.sleep(5000);
        subscriber4.cancel();

        // shutdown publisher
        publisher.shutdown();
    }
}

HotPublisher

@Slf4j
public class HotPublisher implements Flow.Publisher<Integer>{
    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor();      // publisher 초기화시 주기적으로 데이터 생성 목적
    private final Future<Void> task;
    private List<Integer> numbers = new ArrayList<>();
    private List<HotSubscription> subscriptions = new ArrayList<>();


    public HotPublisher(){              // publisher 생성시, 100ms 마다 데이터 무한 생성
        numbers.add(1);
        task = publisherExecutor.submit(() ->{
            for ( int i = 2 ; Thread.interrupted(); i++){
                numbers.add(i);         // 값 추가기 subsriber들에게 전파 필요
                log.info("numbers: {}", numbers);
                subscriptions.forEach(HotSubscription::wakeup);     //subscriptions들 브로드캐스팅하며, 추가된 값에 대해서는 전달?, 100개의 subscrbier가 있다면, 100개의 요청을 보낼것
                Thread.sleep(100);
            }
            return null;
        });
    }

    /**
     * 스레드 작업 종료
     */
    public void shutdown(){
        this.task.cancel(true);         // future cancel
        publisherExecutor.shutdown();
    }


    /**
     * 데이터 구독 (subscribe)
     * 
     * subscriber의 정보로 subscription 생성
     *
     * onSubscribe - 데이터를 통지할 준비가 되었음을 알림
     * @param subscriber the subscriber
     */
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        HotSubscription hotSubscription = new HotSubscription(subscriber);
        subscriber.onSubscribe(hotSubscription);
        subscriptions.add(hotSubscription);
    }


    /**
     * Subscription 클래스
     * numbers가 계속 쌓이고 있고, 타 subscriber가 subscribe한다면 현시점에 대한 offset 관리가 필요
     *
     * offset - 마지막 데이터
     * requiredOffset - 필요한 데이터수
     */
    private  class HotSubscription implements Flow.Subscription{
        private int offset;
        private int requiredOffset;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService subscriptionExecutorService = Executors.newSingleThreadExecutor();

        public HotSubscription(Flow.Subscriber<? super Integer> subscriber){
            int lastIndex = numbers.size()-1;
            this.offset = lastIndex;
            this.requiredOffset = lastIndex;
            this.subscriber = subscriber;
        }


        /**
         * n = 요청수, 즉 backPressure
         *
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
         */
        @Override
        public void request(long n) {
            requiredOffset += n;        // 기존 offset + n(요청수)  --> [offset ~ requiredOffset] 데이터 구간 변수

            onNextWhilePossible();
        }
        /**
         * 값이 생겼으니, subscriber에게 값을 전달할 수 있으면 전달
         */
        public void wakeup(){
            onNextWhilePossible();
        }
        private void onNextWhilePossible(){
            subscriptionExecutorService.submit(() ->{
                while(offset < requiredOffset && offset < numbers.size()){               // offset은 requiredOffset까지의 데이터 구간도 넘어서 안되고, numbers 사이즈를 넘쳐도 안됨
                    int item = numbers.get(offset);
                    subscriber.onNext(item);          // subscriber에 데이터를 전달
                    offset++;                         // 다음 offset 포인트 조정
                }
            });
        }

        /**
         * 더이상 데이터를 안 받으니, subscriptions에서 remove & 스레드 shutdown
         * subscriptions에는 HotSubscription 인스턴스들 존재
         */
        @Override
        public void cancel() {
            this.subscriber.onComplete();
            if(subscriptions.contains(this)){
                subscriptions.remove(this);
            }
            subscriptionExecutorService.shutdown();
        }
    }
}

profile
🤦🏻‍♂️

0개의 댓글