[Reactive Java #2] Pub - Sub 이용해보기

YoungHo-Cha·2022년 10월 2일
1

Web Flux

목록 보기
3/6
post-thumbnail

오늘은 Java에서 제공해주는 concurrent.Flow 에서 pub sub 모델을 사용해볼 예정이다.

해당 라이브러리는 이전에 다루었던 옵저버 패턴의 상향된 버전이라고 생각하면 된다.

목차

  • 선수지식
  • V1(pub-sub으로 로직구현)
  • V2(pub-sub에 프록시 느낌 추가하기)

선수지식

먼저 알아야할 내용이 있다.

Pub-Sub 모델에선 다음의 개념이 존재한다.

  • Publisher
  • Subscriber
  • Subscription

옵저버 패턴과 매우 흡사하다.

해당 내용에는 "구독"이라는 개념이 존재한다.

먼저 이해하기 쉽게 순서를 살펴보자.

  1. publisher가 존재한다.

  2. publisher는 subscription이라는 정보를 가지고 있다.

  3. subscriber는 publisher를 구독한다.

  4. 일련의 스트림이 존재하면 publisher는 subscriber에게 스트림의 내용을 전달한다.

  5. subscriber는 publisher가 전해주는 스트림들을 처리한다.

잘 이해가 안될 것이다. 실생활로 비교해서 다시 생각해보자.

유튜브에서 채널은 publisher이다.
사용자는 subscriber이다.
채널이 게시한 동영상들은 스트림 내용이다.

  1. 유튜브에서 채널을 생성한다.

  2. 사용자들은 채널을 구독한다.

  3. 채널에서 동영상을 게시하면 구독한 모든 사용자에게 일련의 동영상을 제공한다.

  4. 사용자들은 일련의 동영상을 보거나 안보거나 혹은 다운받거나 다운받지않거나 사용자나름 하고싶은 동작을 수행한다.

Publisher

위에서 언급한 유튜브 채널이라고 생각하면 된다.

일련의 스트림(데이터, 이벤트)들을 구독자에게 전달한다.

Publisher는 Subscription이라는 구독 정보를 가지고 있다.

구현 내용

자바에서 제공하는 Publisher는 "subscribe"라는 메소드를 구현해야 한다.

해당 메소드는 구현이 들어왔을 때 어떻게 동작시킬지 정의해야하는 메소드이다.

Subscriber

위에서 언급한 구독자라고 생각하면 된다.

구독자는 Publisher가 제공해주는 일련의 스트림(데이터, 이벤트)들을 구독자 나름 자유롭게 사용한다.

Subscriber를 구현하기 위해서는 다음의 4가지 메소드를 오버라이딩 해야한다.

  1. onSubscribe : 구독을 수행하는 메서드이다. 이 부분에서는 구독을 수행할 지, 수행하지 않을 지를 판단하는 메서드이다.

  2. onNext : 일련의 스트림을 처리하는 부분이다.

  3. onError : 일련의 스트림을 처리하다가 예외 혹은 에러가 나타났을 때 처리하는 부분이다.

  4. onComplete : 모든 스트림을 처리했을 때, 수행되는 메서드이다.

Subscription

Publisher가 가지고 있는 정보이다. 일련의 스트림이 쌓였을 때, 구독자들에게 어떻게 전달할 지를 정하는 정보이다.

Subscription을 구현하기 위해서는 다음의 메서드를 구현해야한다.

  1. request : 스트림을 구독자에게 어떤식으로 전달할 지 결정하는 곳이다.

  2. cancel : 구독 정보를 끊을 때 동작시키는 메서드이다.

V1

이전에 다루었던 1부터 10까지 출력하는 내용을 작성할 계획이다.

public class PubSub {
    public static void main(String[] args) {
        Iterable<Integer> iter = Stream.iterate(1, v-> v+1).limit(10).collect(Collectors.toList());

        Publisher<Integer> pub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(v -> sub.onNext(v));
                            sub.onComplete();
                        }
                        catch (Throwable t){
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

        Subscriber<Integer> sub = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("onSubscribe() 실행");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("onNext() 실행 ---" +" value = " + item);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete() 실행");
            }
        };

        pub.subscribe(sub);
    }
}
  1. Publisher를 생성한다.

  2. Publisher를 구현하기 위해서는 "subscribe"를 구현해야한다. 이 때 "Subscription"을 정의한다. 위 코드에서는 1~10까지 제공하는 내용이다.

여기서는 간편하게 Subscription을 동작시키기 위해서 내부 구현으로 수행했다.

  1. Subscriber를 구현한다.

  2. publisher에 구독자를 등록한다.


V2

이번엔 위의 코드를 프록시 패턴처럼 적용해볼 예정이다.

원하는 상황

구현하려는 상황은 다음과 같다.

  1. 1 ~ 10까지 스트림을 보낸다.

  2. 1 ~ 10까지 스트림을 log로 찍는 구독자를 실행한다.

  3. log 구독자 다음은 각 스트림에 + 10을 해주는 구독자를 실행한다.

코드를 보자.

public class PubSubV2 {

    public static void main(String[] args) {

        // publisher 생성
        Publisher pub = new Publisher();

        // 계산 Sub 생성
        CalcSubscriber calcSubscriber = new CalcSubscriber();

        // Log Sub 생성
        LogSubscriber logSubscriber = new LogSubscriber(calcSubscriber);

        // publish -> log sub -> calc sub
        pub.subscribe(logSubscriber);
    }

    public static class Publisher implements Flow.Publisher<Integer> {
        Iterable<Integer> iter = Stream.iterate(1, v -> v + 1).limit(10).collect(Collectors.toList());

        @Override
        public void subscribe(Subscriber<? super Integer> sub) {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    try {
                        iter.forEach(v -> sub.onNext(v));
                        sub.onComplete();
                    } catch (Throwable t) {
                        sub.onError(t);
                    }
                }

                @Override
                public void cancel() {

                }
            });
        }
    }

        public static class LogSubscriber implements Subscriber<Integer> {
            private final CalcSubscriber calcSubscriber;

            public LogSubscriber(CalcSubscriber calcSubscriber) {
                this.calcSubscriber = calcSubscriber;
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("LogSubscribe onNext() 실행 --- value = " + item);
                calcSubscriber.onNext(item);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {

            }
        }

        public static class CalcSubscriber implements Subscriber<Integer> {

            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("Calc Sub OnNext() 실행  ---- value = " + (item + 10));
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {

            }
        }
    }

약간의 리팩토링을 수행했다.

결과 창을 보면 다음과 같다.

완료.

조금 난해한 개념이라서 그런지, 글로 표현하기가 힘들다..

참고

profile
관심많은 영호입니다. 궁금한 거 있으시면 다음 익명 카톡으로 말씀해주시면 가능한 도와드리겠습니다! https://open.kakao.com/o/sE6T84kf

1개의 댓글

comment-user-thumbnail
2024년 4월 11일

pub sub 패턴에는 중간자 브로커가 있어서 Publisher와 Subscriber는 서로 존재를 알 필요 없는데,
위 내용은 옵저버 패턴에 가까운 것 같습니다

답글 달기