[Android] RxJava (3) - Cold Observable, Hot Observable + Example

Oxong·2021년 7월 13일
0

21.07.13
공부한 것을 정리하는 용도의 글이므로 100% 정확하지 않을 수 있습니다.
참고용으로만 봐주시고, 내용이 부족하다고 느끼신다면 다른 글도 보시는 것이 좋습니다.
+ 틀린 부분, 수정해야 할 부분은 언제든지 피드백 주세요. 😊
                                            by. ryalya



들어가기 전

지난 글인 [Android]RxJava (2) - Observable, Scheduler 에서 Observable, Scheduler, subscribeOn과 ObserveOn 연산자 등을 정리했다.

개인적으로 한 번에 정리해서 보고 싶었는데 양이 적은 편이 아니라서 나중에 찾아보기 쉽게 카테고리를 좀 나누는 것이 낫겠다는 생각도 들었다.

오늘은 Hot Observable & Cold Observable 차이점과 사용법 등에 대해 정리해보려 한다.

개념으로 들어가기 전에 RxJava Cold Observable, Hot Observable에서 Hot Observable과 Cold Observable을 Youtube에 비유하여 설명해준 부분이 어떤 느낌인지 이해하기 쉬워서 가져와봤다.

Cold Observable :
Youtube 동영상을 클릭하면 재생되는 것처럼,
구독을 요청(동영상 클릭)하면 아이템을 발행(영상이 시작)한다.

Hot Observable :
라이브 방송을 시청하는 것처럼, 아이템 발행이 시작된 이후로 모든 구독자에게 동시 같은 아이템을 발행한다.




Cold Observable


Observable에는 Cold Observable과 Hot Observable이 있다.

Cold와 Hot의 차이점은 Observer의 존재와 구독여부이다.

지난 글에서 Observable을 설명할 때, Observer가 Subscribe(구독)해야 데이터를 발행한다고 했다.

여기에서 일반적으로 우리가 사용하는 Observable이 Cold Observable임을 알 수 있다.

왜냐하면 Cold Observable Observable을 생성하고 Observer가 subscribe를 호출할 때까지 데이터 발행을 하지 않기 때문이다.

하지만 Observable에 subscribe를 요청하면 아이템이 처음부터 끝까지 발행되고, 임의로 종료시키지 않는 이상 여러 번의 요청에도처음부터 끝까지 발행하는 것을 보장한다.

Cold Observable의 특징을 정리하면 다음과 같다.

  • 일반적인 옵저버형태.
  • 구독하지 않으면 데이터를 발행하지 않는다.
  • 원하는 시점에 데이터를 요청하고 처음부터 끝까지 결과를 받아온다.
  • ex) 데이터베이스 쿼리, 파일 읽기, API 요청

<Example>
   String[] strings = new String[]{"스테이크","타코","오마카세"};
   
        Flowable<String> source  = Flowable.fromArray(strings)
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single());

        // # 1번째 구독자
        source.subscribe(next -> {
            System.out.println(Thread.currentThread().getName() + " Subscriber # 1: " + next);
//            Log.e("테스트","1번쨰 구독자" +Thread.currentThread().getName() + " Subscriber # 1: " + next);
        });

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // # 2번째 구독자
        source.subscribe(next -> {
            System.out.println(Thread.currentThread().getName() + " Subscriber # 2: " + next);
//            Log.e("테스트","2번쨰 구독자" +Thread.currentThread().getName() + " Subscriber # 2: " + next);
        });

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

결과값

fromArray()로 배열을 받아 Flowable을 생성하면, Flowable은 데이터 흐름에 따라 onNext(), onComplete(), onError()이벤트를 발생시킨다.

여기서 1번째 구독자와 2번째 구독자는 Thread.sleep()함수를 이용하여 시간 차를 두어 구독을 시켰다.

Cold Observable은 구독이 늦건 빠르건 상관없이 모든 데이터를 받아오는 것을 확인할 수 있다.



Hot Observable


Hot Observable은 observer의 존재 여부와 관계없이 데이터를 발행한다.

데이터는 발행이 시작된 이후, 모든 구독자들에게 동시에 같은 아이템을 발행한다.

따라서 늦게 구독한 observer는 구독 이전에 발행한 아이템(데이터)를 받지 못하므로 Observable이 발행하는 데이터 전부를 받는 것을 보장하지 못한다

Hot Observable의 특징을 정리하면 다음과 같다.

  • observer존재 여부와 상관없이 데이터를 발행하는 Observable 이다.
  • 그래서 여러 구독자에 선택적으로 고려가능하다.
  • 구독시점으로부터 발행하는 값을 받는 걸 기본으로 한다.
  • 마우스 이벤트, 키보드 이벤트, 시스템 이벤트 등 UI이벤트에 주로 사용된다. +멀티캐스팅도 포함된다.


Observable 변환

위에서 Cold Observable은 Test해봤으면서 Hot Observable은 왜 Test하지 않았는지 궁금할 수 있다.

기본 Class들은 모두 Cold Observable이므로 우리는 Hot Observable로 변환하여 사용해야 한다.

변환하는 방법에는 SubjectConnectableObservable 2가지가 있다.


1. Subject

Subject Class에는 AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject등이 있다.

AsyncSubjct

AsyncSubjct Class에서는 onComplete 호출 이전에 발행한 마지막 Data만 받아올 수 있다.
완료되기 전 마지막 데이터만 남고 이전 데이터들은 무시된다.

+) 즉, onComplete 호출이 되면 구독자의 dispose() 함수가 호출되면서 연결이 끊긴다.


BehaviorSubject

BehaviorSubject Class에서는 구독자가 구독을 하면 가장 최근 값 혹은 기본값부터 받아온다.
그래서 기본값을 처음에 설정해 줄 수 있다.

아래 예시에서는 createDefault를 통해 기본값을 설정한 것을 확인할 수 있다.

BehaviorSubject<String> subject = 
   BehaviorSubject.createDefault("라면");
   subject.subscribe(data -> 
   
   Log.e("테스트", "1번째 구독자" +  Thread.currentThread().getName() + " Subscriber # 1 = > " + data));
 
   subject.onNext("쫄면");
   subject.onNext("떡볶이");
   subject.subscribe(data -> 
     Log.e("테스트", "2번째 구독자" +  Thread.currentThread().getName() + " Subscriber # 2 = > " + data));
 
   subject.onNext("김치만두");
   subject.onComplete();

<결과값>

결과를 확인하면 이해가 쉬운데,
1번째 구독자는 createDefault로 설정한 기본값인 "라면"부터 받아온다.

2번째 구독자는 구독하는 시점에서 가장 최근 발행된 값부터 받아오기 때문에 "떡볶이"부터 받아왔다.


PublishSubject

PublishSubject Class에서는 구독자가 subscribe() 함수를 호출하면(구독한 시점부터) 값을 발행하기 시작한다.

위의 Subject들과 다르게 옵션이 없고 가장 평범하게 동작하며, 구독한 순간(이후)부터의 데이터를 받을 수 있다.

PublishSubject<String> subject = PublishSubject.create();

subject.onNext("무뚝뚝감자칩");
subject.subscribe(data ->
       Log.e("테스트", "1번째 구독자" + Thread.currentThread().getName() + " Subscriber # 1 = > " + data));

subject.onNext("허니버터칩");
subject.onNext("꼬북칩 초코");

subject.subscribe(data ->
        Log.e("테스트", "1번째 구독자" +Thread.currentThread().getName() + " Subscriber # 2 = > " + data));

subject.onNext("쿠크다스");
subject.onComplete();

<결과값>

결과를 확인하면 구독 시점 이후의 데이터만 받아올 수 있기 때문에 1번째 구독자는 무뚝뚝 감자칩을 받아올 수 없고, 허니버터칩부터 받아왔다.

2번째 구독자 역시 구독 시점 이후에 발행된 데이터인 쿠크다스를 받은 것을 알 수 있다.


ReplaySubject

ReplaySubject Class는 Cold Observable과 비슷하다.

구독자가 생기면 구독 시점과 상관없이 항상 데이터의 처음부터 끝까지 발행한다.(보장한다) (이름 그대로 Replay 해준다고 생각하면 이해하기 쉽다.)

그러나 모든 데이터를 저장하는 만큼 메모리 누수(out of Memory, leak of Memory)가 발생할 가능성이 있기 때문에 주의해야한다. (Backpressure 필요)

ReplaySubject<String> subject = ReplaySubject.create();

subject.onNext("여우");

subject.subscribe(data ->
        Log.e("테스트", "1번째 구독자" + Thread.currentThread().getName() + " Subscriber # 1 = > " + data));

subject.onNext("고양이");
subject.onNext("강아지");

subject.subscribe(data ->
        Log.e("테스트", "2번째 구독자" + Thread.currentThread().getName() + " Subscriber # 2 = > " + data));

subject.onNext("토끼");
subject.onComplete();

<결과값>



2. ConnectableObservable

ConnectableObservable은 구독을 요청해도 바로 데이터를 발행하지 않는다.

그런데, ConnectableObservable은 subscribe()를 사용하지 않는다.

대신 publish()connect()를 사용하는데, publish()는 connect()가 호출되기 전까지 데이터 발행을 유예한다.

publsh()를 통해 Observable을 Hot Observable로 변환하지만, 여기까지만 해서는 데이터를 발행하지 않고
connect()연산자를 호출할 때에 비로소 아이템을 발행하기 시작한다.

ConnectableObservable src =
        Observable.interval(1, TimeUnit.SECONDS)
                .publish();
src.connect();


src.subscribe(value -> Log.e("테스트","1번째 구독자: " + value));
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

src.subscribe(value -> Log.e("테스트","2번째 구독자: " + value));
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

<결과값>

1번째 구독 이후에 Thread.sleep()함수를 3초로 지정하여 3초 이후에 2번째 구독을 하게 하였다.

1번째 구독자가 구독하여 3초 동안 발행한 아이템인 0,1,2를 받아온 후, 2번째 구독자가 구독하여 3부터 받아오는 것을 알 수 있다.


❗❓ autoConnect

하지만 구독 시점과 상관없이 모든 아이템을 받아오고 싶다면?

autoConnect연산자를 사용하면 connect()를 호출하지 않더라도 구독 즉시 아이템을 발행할 수 있도록 해준다.

autoConnect의 매개변수는 아이템을 발행하는 구독자의 수이다.
만약 autoConnect(2)라고 한다면 구독자가 2개 이상 붙어야 아이템을 발행하기 시작한다.

Observable<Long> src =
        Observable.interval(1, TimeUnit.SECONDS)
                .publish()
                .autoConnect(2);

src.subscribe(value -> Log.e("테스트", "1번째 구독자: " + value));

src.subscribe(value -> Log.e("테스트", "2번째 구독자: " + value));
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

<결과값>

만약 src.subscribe()를 한 번만 호출했다면 아이템을 발행하지 않아 코드는 실행되지 않는다.(아이템을 받아오지 않음)

하지만 autoConnect()의 파라미터에 0 이하의 수를 입력하면 구독자 수와 관계 없이 곧바로 아이템을 발행한다.




Reference

RxJava Cold Observable, Hot Observable

RxJava 이해하기 - 4. Hot Observable, Cold Observable

0개의 댓글