Reactive Streams #2 - Operator

soon world·2022년 1월 2일
0

Reactive

목록 보기
2/3

Operator

Publisher 와 Subscriber 사이에는 JAVA 의 stream 처럼 Operator 를 둘 수가 있는데요.
Operator 는 Pub /Sub 간 데이터 흐름에서 데이터를 중간에 가공할 수 있도록 해주는 역할을 하고 있습니다. JAVA 의 stream 과 같은 개념 입니다.

  • 참고
    downStream : Pub -> Sub 으로 데이터가 흘러가는 형태 (위->아래)
    upStream : Sub -> PUB 으로 데이터가 흘러가는 형태 (아래 -> 위)

예제를 보면 이해가 훨씬 쉽기에 토비님 유뷰브에 있는 예제를 살짝 가공 하여 가져왔습니다.

    @DisplayName("지난 챕터 내용 + 확장")
    @Test
    void operatorTest() {
        Publisher<Integer> pub = iterPub(Stream.iterate(1,a->a+1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, s->s*10);  //이러한 걸 Operator 라고 한다.
        Publisher<Integer> map2Pub = mapPub(mapPub, s->-s); //이러한 걸 Operator 라고 한다.
        map2Pub.subscribe(logSub());
    }
    
private Publisher<Integer> iterPub(Iterable<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(sub::onNext);
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };
    }
private <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.info("onSubscribe:");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(T integer) {
                log.info("onNext:{}",integer);
            }

            @Override
            public void onError(Throwable t) {
                log.info("onError:{}",t);
            }

            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        };
    }

위 테스트코드인 operatorTest() 내용을 보시면 mapPub 이라는 메서드 안에서 최초 선언한 Publisher 인 pub 와 function 인터페이스 구현체를 받아 Publisher 로 다시 리턴 하는 모습을 볼 수 있는데요.
그 다음 바로 map2Pub 메서드 안에서 다시 mapPub 과 function 인터페이스 구현체를 받아 다시 Publisher로 리턴하는 모습을 볼 수 있습니다.
이렇듯 Publisher 와 Subscriber 사이에는 Operator 가 여러 개 생길 수가 있습니다.
그리고 마지막 Publisher 에서 subscribe() 메서드를 통해 logSub 이라는 Subscriber 를 통해 Publisher 를 Subscribe 하는 형태 입니다.

전체 해석을 해본다면, IterPub() 를 통해 Publisher 를 생성 한 뒤
[ logPub() 내용을 보면 s.request(Long.MAX_VALUE);
라는 구문을 통해 Subscriber 는 Publisher 에게 가지고 있는 데이터를 그냥 몽땅 다 달라는 요청을 하는 것이며,
Publiser 는 iter.forEach(sub::onNext); 구문을 통해 iterator 를 모두 순회 한 뒤
sub.onComplete() 메서드를 통해 pub/sub 을 완료 처리 합니다.
]
mapPub operator 와 map2Pub operator 2개를 거친 뒤 subscriber 에게 데이터가 전달 되
는 형태 입니다.

결국 자바 스트림에 있는 Operator와 마찬가지로 Publisher 와 Subscriber 간 Operator 도 이와 같은 형태의 모습을 취하고 있다고 볼 수 있겠습니다.

Operator 개념 자체는 자바 스트림을 알고 있다면 쉽게 이해할 수 있는 내용입니다. 하지만 위 예제를 통해 Publisher 와 Subscriber 간 Operator 가 어떻게 작동 하는지에 대한 내용을 알 수 있기에 내용을 기록 해 봅니다.

profile
Hello SoonWorld

0개의 댓글