1. RxJava의 기본(3)

안석주·2021년 11월 8일
0

RxJava

목록 보기
3/10

서론

1장의 마지막입니다. RxJava의 전체적인 구성을 살펴보겠습니다.

1. RxJava의 기본

1.6

RxJava의 기본 구성

Flowable/Observable

우리가 이전에 봤던 Flowable/Observable은 기본적으로 Reactive Streams의 규칙과 그 규칙에 영향을 준 Observable 규약을 따르지 않으면 데이터가 문제없이 통지되는 것을 보장받지 못합니다.
이전 포스트에 썼지만 한번 더 규약을 정리해보자면

  1. null을 통지하면 안된다. (RxJava 2.x 버전부터 생긴 규칙)
  2. 데이터 통지는 해도 되고 안 해도 된다.
  3. Flowable/Observable의 처리를 끝낼 때는 완료나 에러 통지를 해야하며, 둘다 통지하지 않는다.
  4. 완료나 에러 통지를 한 뒤 다른 통지를 해서는 안된다.
  5. 통지할 때는 1건씩 순차적으로 통지하며, 동시에 통지하면 안된다.
    (5번 규칙은 하나의 쓰레드로 실행하면 문제가 없으나 여러 쓰레드로 실행시에 데이터 통지, 완료, 에러 통지도 동기 방식으로 처리하지 않으면 전달받은 측에서 올바른 처리를 보장할 수 없습니다.)

하지만 이러한 동기 처리를 개발자가 직접 구현한다면 버그를 만들거나 비효율적인 동기처리로 성능이 떨어질 수 있습니다. 이러한 처리가 필요하다면 다시 설계를 검토하거나, 하나의 쓰레드에서 처리하는 Flowable/Observable을 여러 개 준비하고, 하나로 결합하는 메소드로 처리하는 것이 가장 안전합니다. 이는 여러개의 Flowable/Observable을 결합하는 merge 메소드를 통해 해결할 수 있습니다!

Subscriber/Observer

Subscriber과 Observer는 통지된 데이터를 전달받아 이 데이터를 처리하는 인터페이스 입니다. Flowable과 Observable의 통지를 받으며 둘은 배압 기능의 차이와 onSubscribe에서 Subscription과 Disposable을 받는다는 차이점 외에는 똑같습니다.

Subscriber/Observer에서 메소드가 호출되는 정상적인 순서(에러가 일어나지 않는다면)는 다음과 같습니다.

  • onSubscribe() 메소드
  • onNext() 메소드
  • onComplete 메소드

먼저 Subscriber과 Observer가 구독한 Flowable/Observable의 통지가 준비되면 onSubscribe 메소드가 호출되는데, onSubscribe 메소드는 1건의 구독에서 한 번만 호출됩니다.

다음으로 데이터가 통지될 때마다 onNext 메소드가 호출됩니다. 열건의 데이터가 있다면 onNext또한 열번 호출됩니다. 하지만 1건의 데이터도 없다면 onNext 메소드는 호출되지 않습니다.

그리고 이러한 모든 데이터를 통지한 뒤에는 onComplete 메소드를 호출해 완료를 통지합니다.
이것은 데이터 스트림을 처리할 때 중요한데, 이벤트 처리의 구독을 시작한 시점에서는 해당 Flowable/Observable이 끝나지 않고 계속해서 데이터를 통지하는지 판단할 수 없습니다. 그래서 모든 데이터를 통지한 뒤에 더이상 통지할 데이터가 없음을 알리고자 완료를 통지하며 완료를 통지할 때의 처리를 할 수 있습니다. onComplete 메소드 또한 구독당 한 번만 호출됩니다.

마지막으로 Flowable/Observable 처리 중에 에러가 발생하면 onError 메소드가 호출돼 바로 처리가 종료됩니다.

Subscription

Subscription은 Flowable과 Subscriber의 onSubscribe 메소드의 인자로 전달되는 객체이며, Reactive Streams에 정의된 인터페이스입니다. request 메소드와 cancel 메소드가 존재하고, 이를 통해 데이터의 개수를 요청할 수도, 구독을 해지할 수도 있습니다!

Disposable

Disposable은 Observable과 Observer 간 구독에서 Obersvable이 구독 준비가 되면 onSubscribe 메소드를 통해 Observer에 전달되는 객체이며, 구독을 해지할 수 있는 메소드를 포함하고 있는 인터페이스 입니다.

public interface Disposable {
    void dispose();
    boolean isDisposed();
}

그런데 Disposable은 Observable과 Observer 사이에 이루어지는 구독 이외에도 사용할 수 있습니다. Flowable#subscribe(Subscriber) 나 Observable#subscribe(Observer)가 아닌 구독하는 메소드(subscribe 메소드/ subscribeWith 메소드)의 반환값으로도 Disposable이 사용됩니다. 이를 이용해 반환된 Disposable을 이용하면 외부에서 구독을 해지하는 것도 가능합니다.

그런데 이렇게 외부에서 구독을 해지하는 것이 다른 스레드가 구독 해지를 실행할 수도 있기 때문에 위험할수도 있습니다. 그러므로 함수형 인터페이스를 인자로 받는 구독 메서드인 DisposableSubscriber/DisposableObserver와 ResourceSubscriber/ResourceObserver에서 반환하는 RxJava가 제공하는 Disposable의 인스턴스는 비동기로 구독 해지를 호출하더라도 문제가 발생하지 않게끔 작동합니다.

또한 Disposable은 구독 해지뿐만 아니라 자원을 해제하는 등의 처리에도 활용할 수 있습니다. 따라서 처리가 끝날 때 자원을 해제해야 한다면 해제 처리를 Disposable의 dispose 메소드에 구현합니다. 그리고 FlowableEmitter/ObservableEmitter의 setDisposable 메소드에 이 Disposable을 설정하면 완료나 에러, 구독 해지 시에 Disposable의 dispose 메소드가 호출되며, ResourceSubscriber/ResourceObserver의 add 메소드로 Disposable을 추가하면 해당 ResourceSubscriber/ResourceObserver의 dispose 메소드 호출 시 Disposable의 dispose 메소드가 호출됩니다.

FlowableProcessor/Subject

Processor는 Reactive Streams에 정의된 생산자(Publisher)와 소비자(Subscriber)의 기능이 모두 있는 인터페이스입니다. 이 인터페이스는 Publisher와 Subscriber를 모두 상속받고 있으며, 다른 메소드는 가지고 있지 않습니다.

이로 인해 Processor은 Publisher를 구독해 데이터를 받는 소비자가 될 수도 있고, 자신을 구독하는 Subscriber에 데이터를 통지하는 생산자가 될수도 있습니다.

RxJava에서는 이러한 Processor의 구현 클래스로 FlowableProcessor를 제공합니다. 또한 Observable/Observer의 관계에서 Processor와 같은 역할을 하는 Subject 또한 제공해줍니다.

FlowableProcessor와 Subject는 추상 클래스이며, 캐시할 수 있는 데이터 개수가 다르거나, 구독할 수 있는 Subscriber가 1개만 있는 것처럼 각각의 특징이 다른 여러 구현 클래스를 제공합니다.

종류설명
PublishProcessor/PublishSubject데이터를 받은 시점에만 소비자(Subscriber/Observer)에 데이터를 통지한다.
BehaviorProcessor/BehaviorSubject소비자가 구독하기 직전 데이터를 버퍼링해 해당 데이터부터 통지한다.
ReplayProcessor/ReplaySubject처리하는 도중 구독한 소비자에게도 받은 모든 데이터를 통지한다.
AsyncProcessor/AsyncSubject데이터 생성을 완료했을 때 마지막으로 받은 데이터만 소비자에게 통지한다.
UnicastProcessor/UnicastSubject1개의 소비자만 구독할 수 있다.

여기부터는 이전에 말했던 Disposable을 구현한 클래스들에 대해서 소개할텐데, 안드로이드에서 매우 자주 쓰이는 부분이니 꼭!!!!! 숙지해야 한다.

DisposableSubscriber/DisposableObserver

DisposableSubscriber/DisposableObserver는 Disposable을 구현한 Subscriber/Observer의 구현 클래스로, 외부에서 비동기로 구독 해지 메소드를 호출해도 안전하게 구독 해지를 하게 합니다. 이 클래스는 onSubscribe 메소드가 final로 구현되어 있으며, onSubscribe 메소드로 전달되는 Subscription/Disposable은 직접 접근하지 못하게 은닉되어있습니다.

그 대신 다음 메소드에서 Subscription/Disposable의 메소드를 호출할 수 있습니다.

DisposableSubscriber의 Subscription 메소드를 호출하는 메소드

  • request(long) : Subscription의 request 메소드를 호출
  • dispose() : Subscription의 cancel 메소드를 호출

DisposableObserver의 Dispose 메소드를 호출하는 메소드

  • dispose() : Dispose의 dispose 메소드를 호출
  • isDispose() : Dispose의 isDisposed 메소드 호출

또한 구독 시작 시점에 원하는 처리를 실행하려면 onSubscribe 메소드 내에서 호출되는 onStart 메소드(위의 사진에서 onStart)를 오버라이드해 구현하면 됩니다.

onStart 메소드는 아래와 같이 구현돼 있으며, onSubscribe 메소드에서 구현했던 것을 onStart 메소드에서 구현하면 됩니다.

protected void onStart(){
    s.get().request(Long.MAX_VALUE)
}

특히 Flowable에서 배압을 적용하고자 DisposableSubscriber에서 설정하는 통지받을 데이터 개수를 바꾸고 싶다면 onStart를 오버라이드하여 request 메소드를 호출, 값을 변경해주면 됩니다.

ResourceSubscriber/ResourceObserver

ResourceSubscriber/ResourceObserver는 위의 DisposableSubscriber, DisposableObserver와 동일하지만 add 메소드가 추가되어 있습니다. 이는 Disposable을 보관할 수 있게 해주고, add 메소드로 보관된 Disposable의 dispose 메소드는 ResourceSubscriber/ResourceObserver의 dispose 메소드가 호출되면 함께 호출됩니다. 하지만 완료, 에러시에 자동으로 dispose 메소드가 호출되지 않으니 주의해야합니다.

subscribe/subscribeWith

subscribe 메소드는 소비자가 생산자를 구독하는 메소드로, 이 메소드를 호출하면 생산자가 데이터를 통지할 소비자를 등록합니다. 생산자가 Cold일 때 subscribe 메소드를 호출하면 생산자는 바로 통지 처리를 시작합니다. 구독 과정에서 통지 처리는 보통 세 단계로 이루어지며(onSubscribe,onNext,onComplete/onError) 이러한 순서는 Subscriber/Observer 내에서 동기화되어야 하고 순서대로 실행되어야 합니다. 이는 생산자의 처리와 소비자의 처리가 비동기로 이루어졌다 해도 여러 데이터의 통지를 동시에 실행하는 일은 없습니다.

또한 subscribe 메소드의 인자에 Subscriber와 Observer를 전달하면 subscribe 메소드는 아무값도 반환하지 않습니다.
그런데 RxJava는 Reactive Streams 사양을 따르는 subscribe 외에도 구독 해지를 위해 Disposable을 반환하는 subscribe 메소드를 제공합니다.

이 부분이 예제를 따라서 해보며 궁금했던 부분이였는데, 저는 항상 subscribe에 object로 모든 함수를 구현해주었었습니다.

심지어 구현할 부분이 없다면 이런식으로 만들어주면서까지...
하지만 Disposable을 제공하는 subscribe 덕분에 그냥 subscribe{}를 이용, 그 안에 onNext()를 구현해주면 된다는 것을 알았습니다!

다음의 그림은 앞서말한 Disposable을 리턴해주는 subscribe 메소드들 입니다.

인자로 observer을 전달한다면 리턴값은 없으며, 외에 람다식 또는 인자로 준다면 onNext, onError, onComplete, onSubscribe 순서로 인자를 받게 됩니다!

이러한 subscribe는 기본 데이터 요청 개수가 Long.MAX_VALUE 입니다.

이렇게 반환받은 Disposable을 사용해 구독을 해지하려면, dispose 메소드를 호출합니다.

val disposable = observable.subscribe {
        println("data=$it")
    }

disposable.dispose()

subscribeWith 함수는 이전에 subscribe 함수가 Unit을 리턴한다고 했는데, subscribeWith에 인자로 Subscriber/Observer로 주면, 인자로 전달받은 Subscriber/Observer를 돌려줍니다. 이는 인자로 ResourceSubscriber/ResourceObserver와 DisposableSubscriber/DisposableObserver를 넘겨주면, Disposable로 리턴을 받을 수 있습니다!

CompositeDisposable

CompositeDisposable은 여러 Disposable을 모아 한번에 dispose 해줄 수 있도록 합니다!
다음 예제에서 2건의 구독을 중간에 해지하며, 구독 해지를 명시적으로 알 수 있게 doOnCancel 메소드를 통해 구독 해지를 출력합니다. (이를 보아 doOnNext, doOnCancel 같은 함수를 유추해볼 수 있을 것 같다..!) 또한 add를 통해 CompositeDisposable에 Disposable을 추가해줍니다!

fun main() {
    val compositeDisposable = CompositeDisposable()
    compositeDisposable.add(
        Flowable.range(1, 3)
            .doOnCancel { println("no.1 canceled") }
            .observeOn(Schedulers.computation())
            .subscribe { data ->
                Thread.sleep(100L)
                println("No.1 data is $data")
            }
    )

    compositeDisposable.add(
        Flowable.range(1, 3)
            .doOnCancel { println("no.2 canceled") }
            .observeOn(Schedulers.computation())
            .subscribe { data ->
                Thread.sleep(100L)
                println("No.2 data is $data")
            }
    )
    Thread.sleep(150L)

    compositeDisposable.dispose()
}
=>No.1 data is 1
No.2 data is 1
no.1 canceled
no.2 canceled

이를 통해 CompositeDisposable을 이용해 한번에 구독을 취소했음을 알 수 있습니다!

Single/Maybe/Completable

1장의 마지막으로 Single/Maybe/Completable에 대해 이야기해보겠습니다.

클래스설명
Single데이터를 1건만 통지하거나 에러를 통지하는 클래스
Maybe데이터를 0건 또는 1건을 통지하여 완료 또는 에러를 통지하는 클래스
Completable데이터를 1건도 통지하지 않고 완료 또는 에러를 통지하는 클래스

이 클래스들은 특별하게 Flowable/Observable 처럼 4가지로 나뉘어져있지 않습니다. 그 이유는, 데이터의 통지 자체가 완료를 의미하거나 데이터 통지 없이 완료만 통지하기 때문입니다..!

그래서 보통의 소비자로는 사용하지 못해 다음과 같은 지정된 소비자를 이용합니다.

생산자소비자
SingleSingleObserver
MaybeMaybeObserver
CompletableCompletableObserver

이러한 소비자들도 DisposableObserver 또는 ResourceObserver에 해당하는 구현 클래스를 제공합니다.

또한 이들은 통지하는 데이터가 최대 1개이므로 배압을 적용할 필요가 없어 Flowable/Observable처럼 나누지 않고, 같은 연산자를 제공하며 Single/Maybe/Completable을 Flowable/Observable로 변환하거나 반대로 변환하는 연산자를 제공하기도 합니다.

Single

Single.create<DayOfWeek> {
        it.onSuccess(LocalDate.now().dayOfWeek)
    }.subscribe(object :SingleObserver<DayOfWeek>{
        override fun onSubscribe(d: Disposable) {
            TODO("Not yet implemented")
        }

        override fun onSuccess(t: DayOfWeek) {
            TODO("Not yet implemented")
        }

        override fun onError(e: Throwable) {
            TODO("Not yet implemented")
        }
    })

Single은 1건만 통지하거나 에러를 통지하는 클래스로, 데이터의 통지 자체가 처리 완료를 의미하기 때문에 완료 통지는 하지 않습니다. 여기에는 onNext, onComplete는 없으며 데이터 통지 하고 완료했다는 프로토콜인 onSuccess만 존재합니다.

Maybe

Maybe.create<DayOfWeek> {
        it.onSuccess(LocalDate.now().dayOfWeek)
    }.subscribe(object : MaybeObserver<DayOfWeek>{
        override fun onSubscribe(d: Disposable) {
            // do nothing
        }

        override fun onSuccess(t: DayOfWeek) {
            println(t)
        }

        override fun onError(e: Throwable) {
            println("에러 : $e")
        }

        override fun onComplete() {
            println("완료")
        }
    })

Maybe는 1건도 통지하지 않거나 1건만 통지할 때 사용됩니다. 그래서 데이터를 1건 통지한다면 onSuccess 메소드를 호출하고, 1건도 통지하지 않는다면 onComplete 메소드를 호출합니다.

Completable

Completable은 데이터를 통지하지 않고 완료나 에러를 통지하는 클래스입니다.

fun main() {
    Completable.create{emitter ->
        // 중략
        emitter.onComplete()
    }.subscribeOn(Schedulers.computation()) // 비동기로 처리리
       .subscribe(object :CompletableObserver{
            override fun onSubscribe(d: Disposable) {
                // do nothing
            }

            override fun onComplete() {
                println("완료")
            }

            override fun onError(e: Throwable) {
                println("에러 = $e")
            }
        })
}

보통 특정 부가 작용이 발생하는 처리를 수행합니다. 그리고 그 해당처리가 끝나면 완료, 에러가 발생하면 에러를 통지합니다. 따라서 부가 작용 처리의 구독을 호출하는 쓰레드와 동일한 쓰레드에서 실행한다면 RxJava를 쓰지 않을 때와 동일해 사용하는 의미가 없으니 인지하고 사용해야 합니다.

마무리

지금까지 책 1장의 내용이였다....
챕터 1이지만 80쪽 분량의 내용이라 정리하기 힘들기도 했지만
다시 정리하면서 느꼈는데, 확실히 블로그에 정리하면서 책을 보는 것이 더 이해도 잘되고, 기억에 남을 듯하다. 하지만 아직 이론 부분만 알고 있는 것 같아 부족하다는 느낌이다. 빨리 다음 챕터도 끝내야 할 듯 하다! + 복습까지

profile
뜻을 알고 코딩하기

0개의 댓글