3. RxJava의 매커니즘(2)

안석주·2021년 11월 28일
0

서론

빠르게 3장을 끝내야 뭐든 다음 일로 넘어갈 수 있을 것 같아서 빠르게 정리한다....
이번 포스트는 에러 처리, 리소스 관리에 대해서 정리해보겠습니다. 안드로이드 블로그인 만큼, Observable을 자주 이용하기 때문에 Flowable에서 자주 이용하는 배압에 대해서는 따로 정리하지는 않도록 하겠습니다.

3.3 에러 처리

이번 포스트에서 RxJava가 제공하는 여러 에러 처리에 대해서 알아보겠습니다!

에러 중에서는 심각한 에러기 때문에 작업 처리를 멈춰야 하는 에러도 있는가 하면, 순간적인 네트워크 중단처럼 재실행하면 회복이 가능한 에러도 있습니다. 또한 처리 작업에 따라 에러 통지를 받으면 프로그램을 종료하는것이 아닌, 대신할 데이터를 보내주고 싶은 경우도 있습니다. RxJava는 다음 세가지를 제공해줍니다.

  • 소비자에게 에러 통지하기
  • 처리 작업 재시도
  • 대체 데이터 통지

이처럼 RxJava는 에러를 감지했을 때 특정한 에러 처리 수단을 제공합니다. 그러나 에러 통지를 받아도 회복할 수 없는 VirtualMachineError 같은 종류의 에러는 별도 처리 없이 그대로 던지기도 합니다.

소비자에게 에러 통지하기

이전에 알아봤던 것처럼 통지 처리중 에러가 발생하면 소비자에게 에러를 통지해 에러 통지를 받은 소비자가 에러에 대응을 하는 메커니즘을 제공합니다. 또한, 명시적으로 에러 통지 기능을 구현하지 않아도 처리 도중에 에러가 발생하면 에러를 던지고 처리를 중단하는 것이 아니라, 기본적으로 소비자에게 발생한 에러를 통지하게 이루어져 있습니다. 이는 특히 비동기 처리 중에 발생한 에러가 주 처리 작업을 수행하는 쓰레드에 반드시 전달되도록 에러 객체가 담긴 에러 메시지를 소비자에게 통지해 적절한 에러 처리를 하게 합니다.

참고로, 에러 통지 시 어떤 처리를 할지 설정하지 않은 subscribe 메소드로 구독할 경우 (Consumer를 이용한 람다로 onNext만 구현해준 경우) 에러가 발생해도 에러의 스택 트레이스만 출력할 뿐, 별다른 에러 처리를 하지 않습니다. 그래서 에러가 발생한 것을 인식하지 못한 채 에러 처리 없이 그대로 구독 후의 처리 작업을 진행하니 주의해야 합니다.

처리 재시도

이전에 말했던 순간적인 네트워크 중단과 같이 재실행하면 정상적인 처리를 할 수 있을 때도 있습니다. 이와 같은 에러를 처리하고자 RxJava는 에러가 발생하면 생산자의 처리 작업을 처음부터 다시 시도함으로써 에러 상황에서 회복해 정상적인 결과를 얻는 방법을 제공합니다. 이때는 소비자에게 에러를 통지하지 않습니다!
이러한 대응 작업을 위해 RxJava에서는 retry()라는 에러 발생 시 재실행하는 메소드를 제공합니다. 이 재시도를 통한 에러 처리는 네트워크가 순간적으로 중단돼 처리 작업이 실패해도 다시 실행하면 올바른 결과를 얻을 수 있을 때 유용합니다.

아래 예제를 예시로, 재시도(retry())를 통한 에러 처리는 재실행될 때 Flowable의 통지 처리를 처음부터 다시 시작합니다. 아래 예제는 1부터 3까지의 수를 통지하는 Flowable을 생성해 2를 통지할 때 에러가 발생하게 됩니다. 또한 이 Flowable은 retry 메소드로 두 번까지 재실행합니다. 그리고 언제 무슨 일이 발생하는지 확인하려고 Flowable에서 통지 처리를 시작할 때와 종료할때, 구독 시작할 때 메시지를 출력합니다. 추가로 Subscriber에서도 통지를 받았을 때 메시지를 출력합니다.

Flowable.create<Int>({
        println("Flowable 처리 시작")
        for (i in 1..3) {
            if (i == 2) {
                throw Exception("예외 발생")
            }
            it.onNext(i)
        }
        it.onComplete()
        println("Flowable 처리 완료")
    }, BackpressureStrategy.BUFFER)
        .doOnSubscribe {
            println("flowable : doOnSubscribe")
        }.retry(2)
        .subscribe(object : Subscriber<Int> {
            override fun onSubscribe(s: Subscription?) {
                println("subscriber: onSubscribe")
                s?.request(Long.MAX_VALUE)
            }

            override fun onNext(t: Int?) {
                println("data:$t")
            }

            override fun onError(t: Throwable?) {
                println("에러 = ${t.toString()}")
            }

            override fun onComplete() {
                println("종료")
            }

        })
        
=> subscriber: onSubscribe
flowable : doOnSubscribe
Flowable 처리 시작
data:1
flowable : doOnSubscribe
Flowable 처리 시작
data:1
flowable : doOnSubscribe
Flowable 처리 시작
data:1
에러 = java.lang.Exception: 예외 발생      

실행 결과에서도 알 수 있듯 에러 발생시 Flowable이 doOnSubscribe() 메소드를 실행하는 것으로 보아, 처리 작업을 처음부터 다시 하는 것을 알 수 있습니다. 또한 retry() 메소드로 재 시도 횟수를 제한했을 때, 이 횟수를 넘어서는 에러가 발생하면 에러를 통지합니다.

이번 예제에서는 어떤 에러가 발생해도 retry를 하는 retry() 메소드를 이용했지만, 에러 객체를 받아 재시도 할지를 판별하는 retry() 메소드도 존재합니다!

이러한 retry() 메소드는 전체적인 아키텍처를 고려하여 사용하지 않으면, 타임아웃 에러같은 경우 더 많은 부하를 주어 타임아웃 에러가 더 많이 날 수 있으니, 주의해야합니다.
또한 구독이 객체의 상태를 바꾸는 등 부가 작용을 일으키더라도 발생한 부가 작용이 원 상태로 복원되지 않고 남아있으니, 주의해야 합니다!!!

대체 데이터 통지

이번에는 에러 발생시에 대체 데이터를 통지해 처리 작업을 에러로 끝내지 않고, 완료하게 만드는 방법을 봅니다. RxJava에서 이는 onError나 onException으로 시작하는 메소드가 이에 해당합니다. 이러한 메소드를 사용하면, 에러가 발생했을 때 소비자에게 에러를 통지하지 않고, 대체 데이터나 대체 Flowable/Observable에 있는 데이터를 통지해 처리 작업을 실행하고 마지막으로 완료를 통지해 작업을 정상적으로 종료할 수 있습니다.

아래 예제에서는 인자를 통지하는 Flowable이 map() 메소드를 이용해 100을 받은 데이터로 나눈 결과의 Integer 값을 통지합니다. 하지만 Flowable이 통지하려는 데이터에는 '0'이 있어서 0으로 나누는 연산에서 ArithmeticException이 발생합니다. 하지만 이 예제에서는 0이 포함된 Flowable을 onErrorReturnItem() 메소드를 이용해 에러가 아닌 대체 데이터를 통지하여 정상적으로 종료해주게 만들어줍니다.

Flowable.just(1, 3, 5, 0, 2, 4)
        .map { 100 / it }
        .onErrorReturnItem(0)
        .subscribe(object : DisposableSubscriber<Int>() {
            override fun onNext(t: Int) {
                println("data=$t")
            }

            override fun onError(t: Throwable?) {
                println("error = ${t.toString()}")
            }

            override fun onComplete() {
                println("완료")
            }
        })
=> data=100
data=33
data=20
data=0
완료

이 예제를 실행하면, 원본 데이터가 '0'일 때 map 메소드에서 에러가 발생하지만, 실행 결과로는 '0'이 통지됩니다. 그리고 에러 통지 없이 완료와 함께 정상적으로 종료가 됩니다. (2,4는 실행되지 않네요)

이처럼 에러 발생시 에러 통지 대신 대체 데이터를 통지하는 것도 가능합니다. 하지만 상황에 따라 대체 데이터를 통지할지, 에러를 통지할지 결정할 수 있습니다. 이는 RxJava에서 제공하는 함수형 인터페이스를 인자로 받는 메소드를 이용하여 해결하면 됩니다.
여기의 종류에는 onErrorReturnItem(), onErrorReturn(), onErrorResumeNext()... 등등이 있으며, 간편하게 쓰기위해 함수 이름만 썼지만, 모두 인자가 존재하니 한번 찾아보시길 바랍니다!

3.4 리소스 관리

이 곳에서는 특정 리소스로부터 데이터를 통지하는 생산자를 생성했을 때 생산자와 리소스의 라이프사이클을 맞추는 방법을 알아보겠습니다.

프로그램을 실행하면서 DB 커넥션이나 파일과 같은 리소스를 얻어 어떤 처리 작업을 했을 때, 처리가 끝나면 이러한 리소스를 해제해야 합니다. RxJava는 Flowable/Observable이 구독될 때 리소스를 얻고 완료나 에러가 발생했을 때 리소스를 해제하게 해서 리소스를 관리할 수 있습니다. 즉, 리소스의 라이프 사이클을 Flowable/Observable의 라이프 사이클과 맞출 수 있습니다.

using()

RxJava는 리소스를 관리하는 using() 메소드를 제공해 리소스의 라이프사이클에 맞춘 Flowable/Observable을 생성할 수 있습니다. 이 메소드는 다음 처리 작업을 하는 함수형 인터페이스를 제공합니다.

    1. 리소스 얻기
    1. 리소스에서 얻은 데이터를 사용하는 Flowable/Observable 생성
    1. 리소스 해제
using(
    Callable<Resource> resourceSupplier, // 1
    Function<? super Resource, ? extends Publisher/ObservableSource<? extends T>> // 2
    Consumer<? super Resource> resourceDisposer) // 3
  1. 데이터를 통지하는 데 필요한 리소스 객체를 얻는 함수형 인터페이스
  2. 1에서 생성한 리소스를 받고, 이 리소스에서 얻은 데이터로 통지 처리 작업을 하는 Flowable/Observable을 생성하는 함수형 인터페이스, 여기서 생성한 Flowable/Observable이 구독될 때 통지 처리를 수행한다.
  3. 1에서 생성한 리소스를 받은 후 이 리소스를 해제하는 함수형 인터페이스

이 using() 메소드로 생성한 Flowable/Observable을 사용하면 리소스 관리는 Flowable/Observable 내부에서 일어나게 됩니다.

또한, 마지막 리소스 해제 작업은 구독을 해지할 때도 실행됩니다. 그러므로 처리 작업 도중에 구독을 해지해야 하는 상황에서도 구독 해지 시점에 자동으로 리소스가 해제됩니다. 다만 이때는 sourceSupplier가 데이터를 통지하려고 리소스에 접근할 때 리소스에 접근할 수 있는지를 확인해야합니다.

FlowableEmitter/ObservableEmitter

Flowable/Observable의 create 메소드 내부에서 사용하는 Emitter도 리소스를 해제하는 수단으로 다음 메소드가 있습니다!

setCancellable(Cancellable cancellable)

FlowableEmitter/ObservableEmitter의 setCancellable() 메소드는 Cancellable 인터페이스를 설정하는 메소드입니다. 이 인터페이스는 다음과 같이 구독이 취소될 때 처리 작업을 하는 메소드 하나만 있는 함수형 인터페이스입니다!

interface Cancellable{
    void cancel() throws Exception;
}

setCancellable 메소드로 Cancellable 인터페이스를 설정해 두면 완료 통지나 에러 통지를 한 후 또는 구독을 중도에 해지했을 때 cancel 메소드가 실행됩니다. 따라서 create 메소드로 생성한 Flowable/Observable이 정상적으로 종료되거나 불필요해졌을 때 구독을 해지할 수 있게 제대로 관리한다면 Flowable/Observable 외부에서 별도로 리소스 관리하지 않아도 됩니다.

또한, Cancellable은 함수형 인터페이스므로 이전에 작성하지 않고 바로 람다식으로 구현 가능합니다.

emitter.setCancellable{ it.close() }

setDisposable(Disposable disposable)

FlowableEmitter/ObservableEmitter의 setDisposable() 메소드는 Disposable 인터페이스를 설정하는 메소드입니다!

public interface Disposable{
    // 파기될 때 처리
    void dispose();
    
    // 이미 파기됐으면 true
    boolean isDisposed();
}

setDisposable() 메소드로 Disposable을 설정해 두면 완료 또는 에러 통지한 후나 구독을 중도에 해지했으 때 dispose() 메소드가 실행됩니다!

주의할점

구독을 중도에 해지했을 때 데이터를 통지하는 처리 작업이 이미 파기한 리소스에 접근하면 에러가 발생합니다. 그러므로 리소스 접근시에 파기됐는지 체크하는 작업이 필요합니다.

또한, Cancellable과 Disposable을 동시에 설정할 수 없습니다. setCancellable 메소드의 내부에서는 setDisposable 메소드를 호출하기 때문에, 동시에 설정할 경우 설정 시점에 어느 한쪽의 리소스가 해제되어 버립니다!

profile
뜻을 알고 코딩하기

0개의 댓글