Reactive Streams #3 - Scheduler

soon world·2022년 1월 24일
0

Reactive

목록 보기
3/3

Scheduler

오늘은 스케쥴러에 대해 알아보겠습니다.

reactive stream 에서의 스케쥴러는 operator 를 이용해 별도의 스레드에서 Pub/Sub 을 동작하게 하는 기능을 말합니다.
(webflux 에서는 PublishOn / SubscribeOn 오퍼레이터가 스케쥴러 기능 담당 합니다.)

1. SubscribeOn

먼저 Operator 를 이용해 Subscribe 를 별도의 스레드에서 동작 케 해보겠습니다.

Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    log.info("request()");
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        };

위와 같은 간단한 Publisher 가 있을 때, Subscribe 하기 전에 Operator 를 중간에 끼워 보겠습니다.

Publisher<Integer> subOnPub = sub -> {
            ExecutorService es = Executors.newSingleThreadExecutor();
            es.execute(()-> pub.subscribe(sub));
        };

#2장에서 봤었던 평범한 Operator 인데 ExecutorService 를 이용해 별도의 스레드가 pub을 subscribe 하도록 하였습니다.

마지막엔 아래처럼 Subscriber 를 구현 하여 Publisher 메시지를 받을 수 있게 됩니다.

subOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.info("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.info("onNext:{}" , integer);
            }

            @Override
            public void onError(Throwable t) {
                log.info("onError:{}", t);
            }

            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        });

이렇게 되면 결국, 오퍼레이터를 통해 별도의 스레드에서 메시지를 subscribe 하게 됩니다.

exit 를 찍은 뒤 main 메서드가 종료되고 별도의 스레드로 pub과 sub 이 진행 되는 모습을 보실 수 있습니다.

SubscribeOn : Publising을 별도의 스레드에서 실행 하게하여 subscribe 하게 해달라!
Publish 가 Subscribe 보다 느릴 경우 사용 합니다.

2. PublishOn

이번엔 반대로 Publish 를 별도의 스레드에서 실행 해 보겠습니다.

위와 마찬가지로 아래 평범한? publisher 가 있습니다.

 Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    log.info("request()");
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }

                @Override
                public void cancel() {

                }
            });
        };

오퍼레이터를 구현하여 pub 을 subscribe 합니다. 여기서 1번과 다른 점은
subscribe 한 뒤 다시 publising 하는겁니다. (오퍼레이터니까 얼마든지 가능합니다)
마찬가지로 executorService 를 활용하여 별도의 스레드를 사용 하였습니다.

Publisher<Integer> pubOnPub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                //실제 구현체들도 Publish 할 때도 single thread 로 처리 함
                ExecutorService es = Executors.newSingleThreadExecutor();

                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }

                @Override
                public void onNext(Integer integer) {
                    es.execute(()->sub.onNext(integer));
                }

                @Override
                public void onError(Throwable t) {
                    es.execute(()->sub.onError(t));
                }

                @Override
                public void onComplete() {
                    es.execute(()->sub.onComplete());
                }
            });
        };

마지막으로는 메시지를 구독할 기본 Subscriber 를 만들어 줍니다.
(오퍼레이터를 subscribe 해줘야겠죠?)

pubOnPub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.info("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.info("onNext:{}" , integer);
            }

            @Override
            public void onError(Throwable t) {
                log.info("onError:{}", t);
            }

            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        });

아래 결과는 subscribe 하는 부분만 별도의 스레드를 사용했음을 알 수 있습니다.

PublishOn : Pub 은 너무빠르니 메인에서 돌아가게끔하고, Sub 만 별도의 스레드로 처리하겠음!
Publish 가 Subscribe 보다 빠를 때 사용 합니다.

잠깐!!! PublishOn 의 경우 Publish 가 Subscribe 보다 빠를 때 사용한다고 하는데,
그럼 Publish를 너무 빠르게 해서 Subscribe 할때 순서 보장이 안되는거 아님?

위 주석내용 처럼 실제 Reactor 구현체도 singlethread 로 구현하기 떄문에 순서가 보장됨
executorService 의 경우 대기걸리면 블로킹 큐에 담겨서 기다려 집니다.

3. PublishOn , SubscribeOn 둘 다 사용 가능?

네, 가능합니다. 위 예제들처럼 직접 구현을 한다면 오퍼레이터가 하나 더 추가 하면 됩니다.

이번엔, 실제 reactor 구현체에서 둘 다 쓰는 예제를 보겠습니다.

public static void main(String[] args) {
        Flux.range(1,10)
                .publishOn(Schedulers.newSingle("pub"))
                .log()
                .subscribeOn(Schedulers.newSingle("sub"))
                .subscribe(System.out::println);
        System.out.println("exit");
    }

이런 식으로 둘 다 사용할 수도 있습니다.

Flux 의 경우 was 나 netty 같은 컨테이너를 사용했으면 그 안에서 관리가 되었을텐데 main 함수에서 돌리면 별도의 스레드가 자동으로 꺼지지 않는다 이럴경우엔 pub이 끝나면 강제로 종료시켜줘야 할텐데 어떻게 할 수 있을까?

이 문제를 해결하기 위해 위에서 구현했던 연습이 빛을 발합니다...

방법은 스케쥴러를 변수처리 한 뒤, subscirber 구현체를 구현 해 주면 됩니다.
@OnComplete 될 때 스케쥴러들을 dispose (shutdown 임) 시켜 주면 됩니다!

	Scheduler pub = Schedulers.newSingle("pub");
        Scheduler sub = Schedulers.newSingle("sub");
        Flux.range(1,10)
                .publishOn(pub)
                .log()
                .subscribeOn(sub)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        log.info("onSubscribe");
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log.info("onNext :" , integer);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {
                        pub.dispose();
                        sub.dispose();
                    }
                });

이상으로 스케쥴러에 대해 알아봤습니다.

profile
Hello SoonWorld

0개의 댓글