[Android] RxJava (4) - Flowable, Single, Maybe, Completable

Oxong·2021년 7월 23일
0

21.07.22
공부한 것을 정리하는 용도의 글이므로 100% 정확하지 않을 수 있습니다.
참고용으로만 봐주시고, 내용이 부족하다고 느끼신다면 다른 글도 보시는 것이 좋습니다.
+ 틀린 부분, 수정해야 할 부분은 언제든지 피드백 주세요. 😊
                                            by. ryalya




들어가기 전

Observable

  • rx의 기본적인 단위.
  • 데이터의 변화가 발생하는 데이터 소스로서, 데이터 흐름에 맞게 Consumer에게 알림을 보내는 class.

Observable은 지속적으로 발생하는 이벤트뿐만 아니라 비동기적인 네트워크 요청, 리스트의 스트림 처리 등 다양한 상황에 활용할 수 있다.

하지만 다양한 스트림을 처리하기에 Observable하나만으로는 부족하다는 의견이 있어 Rx2에서는 다양한 데이터 스트림에 따라 사용할 수 있는 Observable들이 늘어나게 되었다.


오늘은 RxJava2부터 추가된 Observable인 Flowable, Single, Maybe, Completable을 이해해보려 한다.

(※ Flowable Class는 RxJava (3) - Cold Observable, Hot Observable + Example에서 예제를 Test할 때 이미 사용했지만 Class 자체에 대한 설명은 하지 않았으므로 함께 다뤄보겠다.)



Flowable

RxJava에는 Backpressure라는 개념과 이를 처리하는 Flowable class가 존재한다.

먼저 Backpressure에 대해 간단하게 이해하자.

배압(BackPressure)이란?

배압이란 데이터 생산과 소비가 불균형적일 때 일어나는 현상이다.

만약 데이터를 1초마다 발행하는데 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 된다.
(=Observable이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하고 있음.)

이 상태가 지속되면 결국 메모리가 overflow되고 OutOfMemoryError로 이어져 앱이 터질 것이다.

이러한 현상을 배압(Backpressure)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공한다.


Flowable은

연속적으로 데이터가 흐르는 스트림을 생성한다.

한번 구독하면, 데이터가 흐름에 따라 화면을 갱신시키는 코드에 사용될 수 있다.

RxJava 공식사이트에서는 통신, DB, 대량의 데이터 처리에는 Flowable을 사용하는 것을 권장한다.

어떤 다수의 데이터를 처리하기 위한 비동기적 작업을 진행하려고 할 때, 작업양과 제한된 실행 환경에서 발생이 예상되는 예외 사항을 처리 하기 위해 사용하는 것으로 정리해 볼 수 있다.

배압 전략

Flowable에도 배압을 제어하지 못해 MissingBackpressureException이 발생할 수 있는 예외상황1이 존재한다. 따라서 Flowable에 배압 전략을 명시함으로써 배압을 제어할 수 있다. 5가지의 배압 전략이 존재하며 각각의 내용은 다음과 같다.

Flowable과 interval()을 같이 사용하는 경우. interval 연산자는 스케줄러와 관계없이 시간에 의존해 데이터를 발행하므로 에러가 발생한다.

이미지 출처


배압 제어 연산자

onBackPressureBuffer()

BackpressureStrategy.BUFFER 전략을 적용한다. 매개변수로 버퍼의 용량, 버퍼 overflow 발생 시의 동작 등을 함께 전달할 수 있다.

<예제>

Flowable.range(1, 50_000_000)
        .onBackpressureBuffer(128, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)
        .observeOn(Schedulers.computation())
        .subscribe(data -> {
            Thread.sleep(100);
            System.out.println(data);
        }, error -> System.out.println(error.getMessage()));

<결과>

Observable과는 다르게 오버플로우가 발생했을때 처리를 진행할 수 있다.


Observable과의 차이점

가장 큰 차이점은 backpressure buffer를 기본 기능으로 제공하고 있는지의 차이인 것 같다.

Observable도 backpressure buffer 를 직접 생성해 주면 사용이 가능하지만, Flowable은 자체적으로 사용이 가능하다.

RxJava 공식사이트와 타 블로그(Refrence참고)를 참고하여 차이점은 보기 쉽게 표로 정리해보았다.

Flowable과 Observable은 대량의 데이터를 처리하지만 아래부터 설명할 3개의 Class는 데이터를 최대 1건만 발행하는 생산자들이다.


Single

Single은 오직 1개의 데이터만 발행한다.
따라서 두가지 메소드(상태)를 구독자에게 전달한다.

  • onSuccess
    : 작업이 성공했을 때 결과값을 배출
    (Observable의 onComplete와 동일)

  • onError
    : 에러가 발생했을 때 에러를 배출
    (Observable의 onError와 동일)

데이터가 발행과 동시에 onSuccess로 종료되므로 주로 한 번의 데이터만 발행하는 API 통신, Client 요청에 대응하는 서버의 응답에 많이 사용한다.


가장 간단한 생성 방법은 정적 팩토리 함수(생성 연산자) just를 사용하는 것이다. Observable의 just를 사용할 때와 같은 방법으로 사용하면 된다.

<예제>

// Single.just()
Single.just("Single Test - Single.just()")
        .subscribe(System.out::println);

// Single.create()
Single<String> createdSingle = Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull SingleEmitter<String> emitter) {
        emitter.onSuccess("Single Test - Single.create()");
    }
});
createdSingle.subscribe(System.out::println);

<결과>

위의 예제에서처럼 SingleEmitter를 이용해 데이터를 발행하거나(onSuccess), 에러를 발생시킬 수 있다(onError).

그리고 Observable을 Single로 변환할 수도 있다. 단, 여러 데이터를 발행하는 Observable을 Single로 변환하면 컴파일에러가 발생한다는 사실에 주의하자.

<예제>

// Observable → Single 변환
// fromObservable() 사용
Observable<String> observable = Observable.just("Single Test : from.observable()");
Single.fromObservable(observable)
        .subscribe(System.out::println);

// Observable → Single 변환
// single() 사용
Observable.just("Single Test : just()")
        .single("Single Test : default Value") //default value
        .subscribe(System.out::println);

<결과>



Maybe

Single 클래스와 마찬가지로 최대 데이터 하나를 가질수 있지만 데이터 발행 없이 바로 데이터 발생을 완료(Single 클래스는 1개 완료, Maybe 클래스는 0 혹은 1개 완료)할 수도 있다.

즉, Maybe 클래스는 Single 클래스에 onComplete 이벤트가 추가된 형태로 3가지 알림을 보낸다.

  • onSuccess
    : 데이터 하나를 발행함과 동시에 종료

  • onError
    : 에러가 발생했음을 알림

  • onComplete
    : 데이터 발행이 완료됐음을 알림


Maybe도 Observable이나 Single과 같이 just와 create 연산자가 존재한다. 예제에서는 create를 통해 Maybe를 생성하였다.

<예제>

Maybe.just("Hello World")
        .delay(10, TimeUnit.SECONDS, Schedulers.io())
        .subscribeWith(new DisposableMaybeObserver<String>() {
            @Override public void onSuccess(String value) {
                // 성공하여 값을 발생시켰음
                System.out.println("Maybe Test >>> Success: " + value);
            }
            @Override public void onError(Throwable error) {
                // 에러
                error.printStackTrace();
            }
            @Override public void onComplete() {
                // 성공했지만 값이 없을 때
                System.out.println("Maybe Test >>> complete : Null");
            }
        });

<결과>

delay를 10으로 설정해줬기때문에 해당 delay 이후에 결과값이 출력되었다.

onSuccess가 데이터를 발행하면 동시에 종료되므로 onComplete는 실행되지 않는다.



Completable

Completable은 데이터를 발행하는 Observable, Single, Maybe와 달리 데이터 발행의 완료/에러 신호만 보내는 특수한 형태이다.

즉, 데이터 생산자이지만 데이터를 1건도 통지하지 않는다.

이는 데이터 통지의 역할 대신에 Completable 내에서 특정 작업을 수행한 후에, 해당 작업을 끝났음을 통지하는 목적으로 사용되기 때문이다.

따라서 데이터 발행의 완료를 알리는 onComplete와 에러 발생을 알리는 onError 2가지 알림을 보낸다.

Room 데이터 베이스 사용에서와 같이 백그라운드에서 동작해야 되는 함수가 필요한 경우, 서버에 객체를 업데이트하는 요청을 보낸 경우나 로깅용도로 사용하는 경우 사용할 수 있다.

<예제>

// Completable
Completable.create(emitter ->{
    emitter.onComplete();
}).subscribe(()->System.out.println("Completable Test >>> completed 1"));

Completable.fromRunnable(() ->{
}).subscribe(() -> System.out.println("Completable Test >>> completed 2"));

<결과>


asd
asd

Reference

[RxJava] RxJava 이해하기 - 7. Backpressure와 Flowable
[RxJava] RxJava 에서 Observable 와 Flowable 선택하여 사용하는 기준
RxJava2의 5가지 Ovservable
[RxJava] RxJava 프로그래밍(1) - 리액티브 프로그래밍
Single, Maybe, Completable

0개의 댓글