6. RxJava의 디버깅과 테스트

안석주·2022년 2월 3일
0

RxJava

목록 보기
10/10

서론

드디어!!! 마지막 장입니다. 6장까지 오면서 앞부분을 세번이나 더 봤는데도... 안드로이드에 적용하는건 어렵고 어렵네요... 좀더 많이 봐야겠음을 느낍니다... 그럼 마지막 6장 정리하고 계속해서 복습해나가겠습니다..!

6.1 디버깅과 테스트

일반적으로 RxJava로 구현한 코드를 디버깅하거나 테스트하는건 어렵습니다. 이전에도 설명했듯 생산자와 소비자는 분리되어 서로에게 영향을 주지 않아야함을 원칙으로, 소비자에게 통지하는 데이터는 기본적으로 외부에서 접근할 수 없습니다. 따라서 생산자가 처리를 시작하고, 소비자에가 에러나 완료 통지를 받을 때까지 처리가 끝나야만 데이터가 어떻게 바뀌었는지 확인이 가능합니다.

또한 비동기 처리시에 다양한 조건에 의해 실행 결과가 달라질 수도 있습니다. 실행 환경에 따라 평소에는 성공하는 처리도 시간초과가 나는 상황도 있을 수 있습니다.

이러한 디버깅, 테스트를 돕기위한 메소드와 클래스들을 알아봅니다!

6.2 'do' 메소드

RxJava를 하면 자주보는 doOnNext, doOnError 등등의 메소드들을 알아봅시다!

RxJava는 통지할 때나 통지한 이후에 특정 부가 작용이 발생하는 메소드를 제공합니다. 보통 'doOn'이나 'doAfter'로 시작하는데, 메소드의 상당수는 통지 시에 처리할 내용을 정의한 함수형 인터페이스를 인자로 전달받습니다. 이때 해당 함수형 인터페이스의 메소드에 반환값이 없어서 부가 작용이 발생한다고 볼 수 있습니다.

기본적으로 통지받을 때 발생하는 부가 작용은 소비자 측에서 처리하는 것이 이상적입니다. 하지만, 통지된 데이터가 여러 연산자를 통해 변환되거나 필터링되는 경우에는 연산자 간에 어떻게 데이터가 변환되고 어느 시점에 특정 통지 메소드가 호출되는지 알고 싶을 때가 있습니다. 이때 로그를 남길 때 'do'로 시작하는 메소드를 넣어 어느 시점에 어떤 데이터를 통지했는지 확인이 가능합니다. 밑에 하나하나 확인해봅니다!

메소드설명
doOnNext데이터 통지시 호출하는 메소드
doOnComplete완료 통지시 호출하는 메소드
doOnError에러 통지시 호출하는 메소드
doOnSubscribe구독 시작시 호출하는 메소드
doOnDispose구독 해지시 호출하는 메소드

doOnNext

doOnNext 메소드는 데이터 통지 시에 지정한 처리 작업을 실행하는 메소드입니다. doOnNext 메소드에서 Observable이 데이터를 통지하는 시점에 인자로 전달받은 함수형 인터페이스를 실행합니다. 원하는 처리를 인자로 전달할 함수형 인터페이스를 구현하면 통지 데이터가 이 함수형 인터페이스 메소드의 인자로 전달되므로 어떤 데이터를 받았는지 확인할 수 있습니다.

Observable.range(1,5)
        .doOnNext { println("---기존 데이터 $it") }
        .filter{
            it % 2 == 0
        }
        .doOnNext { println("----filter 후 데이터 $it") }
        .subscribe(DebugSubscriber())
=>
---기존 데이터 1
---기존 데이터 2
----filter 후 데이터 2
Thread[main,5,main] null : 2
---기존 데이터 3
---기존 데이터 4
----filter 후 데이터 4
Thread[main,5,main] null : 4
---기존 데이터 5
완료

결과를 보면, doOnNext 메소드로 Observable이 통지한 데이터를 받습니다. 첫 번째 doOnNext 메소드는 filter를 사용하기 전에 Observable이 통지하는 데이터를 출력하고, filter 메소드 호출 후에 사용한 doOnNext 메소드는 filter 메소드의 결과로 통지되는 데이터를 출력합니다!

위와 같이 doOnNext를 어디에 사용하느냐에 따라 받는 데이터가 다릅니다.

또한 doOnNext 이후에 Subscriber 출력이 이루어집니다. 이 점또한 알아두는 것이 좋겠네요!

doOnComplete

doOnComplete 메소드는 완료 통지시 지정한 처리 작업을 실행합니다.
위와 동일하게 함수형 인터페이스를 완료 통지시에 실행합니다.

Observable.range(1,5)
        .doOnComplete {
            println("doOnComplete")
        }.subscribe(DebugSubscriber())
        
=> Thread[main,5,main] null : 1
Thread[main,5,main] null : 2
Thread[main,5,main] null : 3
Thread[main,5,main] null : 4
Thread[main,5,main] null : 5
doOnComplete
완료

이 또한 doOnNext와 동일하게 완료 통지 이전에 doOnComplete를 호출하네요!

doOnError

doOnError 메소드는 에러를 통지하면 지정한 처리를 실행합니다. 위와 동일하게 함수형 인터페이스를 실행하며 통지되는 에러 객체는 함수형 인터페이스 메소드의 인자로 전달되어 어떤 에러인지 확인이 가능합니다.

Observable.range(1, 5)
        .doOnError {
            println("기존 데이터 ${it.message}")
        }
        .map {
            if (it == 3) {
                throw Exception("예외 발쌩!!")
            }
            return@map it
        }.doOnError {
            println("map 적용 후 ${it.message}")
        }.subscribe(DebugSubscriber())
        
=> Thread[main,5,main] null : 1
Thread[main,5,main] null : 2
map 적용 후 예외 발쌩!!
Thread[main,5,main] : 에러 = kotlin.Unit
java.lang.Exception: 예외 발쌩!!
	at RxJava.chap6.DoOnErrorExampleKt.main$lambda-1(DoOnErrorExample.kt:13)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:101)
....

위의 예제는 1부터 5까지의 숫자 중 데이터가 3일 때 map 메소드로 예외가 발생하게 합니다. 그리고 에러를 통지한 것을 doOnError 메소드로 출력합니다.

실행 결과를 보면 map 메소드 호출 전에는 Observable이 에러를 통지하지 않으므로 doOnError 메소드가 실행되지 않습니다.

그리고 map 메소드에서 에러가 발생했을 때 Observable이 에러를 통지하므로 doOnError 메소드가 실행됩니다. 이처럼 doOnError 메소드를 사용하는 위치에 따라 에러 통지를 받을 수 있는지 결정됩니다.

doOnSubscribe

doOnSubscribe 메소드는 구독 시작(onSubscribe)시 지정한 처리를 실행합니다. 동일하게 함수형 인터페이스를 실행하고, 메소드의 인자로 Disposable이 전달됩니다.

Observable.range(1,5)
        .doOnSubscribe {
            println("doOnSubscribe")
        }.subscribe(object: Observer<Int>{
            override fun onSubscribe(d: Disposable) {
                println("---subscriber = onSubscribe!")
            }

            override fun onNext(t: Int) {
                println("---subscriber = onNext: $t")
            }

            override fun onError(e: Throwable) {
                //
            }

            override fun onComplete() {
                //
            }

        })

=> doOnSubscribe
---subscriber = onSubscribe!
---subscriber = onNext: 1
---subscriber = onNext: 2
---subscriber = onNext: 3
---subscriber = onNext: 4
---subscriber = onNext: 5

예상과 동일하게 doOnSubscribe가 먼저 실행되고, subscriber가 실행됐습니다.

doOnDispose

doOnDispose는 구독이 해제될 때 지정한 처리를 실행하는 메소드입니다. 위의 실행과 동일하며 완료 또는 에러로 종료시에 doOnDispose는 실행되지 않습니다! dispose()를 실행해야만 호출이 됩니다.

Observable.interval(100L, TimeUnit.MILLISECONDS)
        .doOnDispose { println("doOnDispose") }
        .subscribe(object : Observer<Long> {
            private lateinit var disposable: Disposable
            private var startTime: Long = 0
            override fun onSubscribe(d: Disposable) {
                startTime = System.currentTimeMillis()
                disposable = d
            }

            override fun onNext(t: Long) {
                if (System.currentTimeMillis() - startTime > 300L){
                    println("구독 해지")
                    disposable.dispose()
                    return
                }
                println("$t")
            }

            override fun onError(e: Throwable) {
                println("에러")
            }

            override fun onComplete() {
                println("완료")
            }
        })
Thread.sleep(1000L)

=> 
0
1
구독 해지
doOnDispose

결과를 보면 dispose()를 호출한 후에 Observable의 doOnDispose()가 호출됩니다.

안드로이드에서도 이러한 'do'메소드를 이용해서 디버깅을 자주 시도하기도 하고, 내용이 어렵지 않으니 숙지하여 사용하면 좋을 것 같습니다!!

6.3 'blocking'으로 시작하는 메소드

비동기 프로그래밍을 테스트하기 위해선 테스트가 실행되는 쓰레드에서 실행 결과를 받을 때까지 대기할 수 있어야 합니다. RxJava에서는 blocking 메소드로 이를 도와줍니다. 이러한 메소드들은 다른 쓰레드에서 수행하는 Observable의 통지를 호출한 쓰레드에 반환합니다. 즉 생성 후에 다른 쓰레드에서 나온 데이터를 다시 테스트 쓰레드로 돌려줍니다. 이를 통해 결괏값과 기댓값을 비교해볼 수 있습니다!

위와 동일하게 원래는 디버깅과 테스트 목적으로 사용하는 메소드이기 때문에, 잘못 사용할 경우 쓰레드가 계속 멈춰있을 수도 있습니다. 이제부터 알아보겠습니다.

blockingFirst

blockingFirst는 이 메소드를 호출한 쓰레드에서 Observable의 첫 번째 통지 데이터를 받게하는 메소드입니다. 호출한 쓰레드는 첫 번째 통지 데이터를 받을 때까지 대기하며 다음 처리를 진행하지 않습니다.

subscribe 메소드를 호출해 처리를 시작하는 Observable은 blockingFirst 메소드를 호출하면 처리를 시작하지만, ConnectableObservable처럼 subscrbie 메소드를 호출해도 처리를 시작하지 않을 경우에는 blockingFirst 메소드를 호출한 시점에 처리가 멈추므로 주의해야합니다!

blockingFirst 메소드가 결과를 반환하는 시점은 첫 번째 데이터가 통지될 때입니다.

또한 이 메소드는 데이터나 완료를 통지하는 Observable에서 사용해야만합니다. never 같은 아무것도 통지하지 않거나 결과를 반환하기 이전에 구독 해지를 하는 경우 통지 데이터를 계속 기다려 쓰레드가 멈춘상태로 있게됩니다!

@Test
fun getFirst() {
    val actual = Observable.interval(300L, TimeUnit.MILLISECONDS)
            .blockingFirst()

    Assert.assertThat(actual, `is`(0L))
}

해당 코드는 오류 없이 잘 실행이 됩니다!

@Test
fun getFirst() {
    val actual = Observable.interval(300L, TimeUnit.MILLISECONDS)
            .blockingFirst()

    Assert.assertThat(actual, `is`(1L))
}

=> Expected: is <1L>
     but: was <0L>
java.lang.AssertionError: 
Expected: is <1L>
     but: was <0L>
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
	at org.junit.Assert.assertThat(Assert.java:956)
	at org.junit.Assert.assertThat(Assert.java:923)
	at MyTest.getFirst(MyTest.kt:16)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 ...

하지만 1L을 넣어보면 오류가 발생합니다! 1L을 예상했지만, 값이 0L이라고 나오네요!

blockingLast

blockingLast도 blockingFirst와 비슷합니다! blockingLast 메소드는 마지막 데이터를 받게 해주며 마지막 통지 데이터를 얻을 때까지 다음 처리를 진행하지 않습니다.

subscribe 메소드를 호출하면 처리를 시작하는 Observable은 blockingLast 메소드를 호출하면 처리를 시작하나 위와 동일하게 ConnectableObservable처럼 subscrbie 메소드를 호출해도 처리를 시작하지 않는 Observable이 blockingLast 메소드를 호출하면 호출 시점에 처리가 멈춰버리므로 주의해야 합니다!!

blockingLast 메소드의 결과 반환 시점은 완료 통지 시점입니다! 마지막 데이터 반환 시가 아닙니다! 이는 interval같이 끊임없이 데이터를 통지하는 Observable에는 사용할 수 없습니다. 대상 Observable이 완료가 아닌 에러를 통지할 경우 해당 에러를 던져줍니다.

또한 blockingFirst와 동일하게 never메소드로 생성한 Observable을 사용하면 쓰레드가 멈춥니다!

@Test
fun getLast() {
    val actual = Observable.interval(300L, TimeUnit.MILLISECONDS)
            .take(3)
            .blockingLast()

    Assert.assertThat(actual, `is`(2L))
}
=> 성공

@Test
fun getLast() {
    val actual = Observable.interval(300L, TimeUnit.MILLISECONDS)
            .take(3)
            .blockingLast()

    Assert.assertThat(actual, `is`(1L))
}

=> Expected: is <1L>
     but: was <2L>
java.lang.AssertionError: 
Expected: is <1L>
     but: was <2L>
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
	at org.junit.Assert.assertThat(Assert.java:956)
	at org.junit.Assert.assertThat(Assert.java:923)
	at MyTest.getLast(MyTest.kt:17)
	....

위에서 말했듯 계속 통지를 막기위해 take 메소드를 이용했고, 2L을 예상했지만 1L이 값으로 와 에러를 던집니다!

blockingIterable

blockingIterable는 호출한 원본 쓰레드에서 Observable이 통지하는 모든 데이터를 받는 Iterable을 얻게 하는 메소드입니다! subscribe 메소드가 호출되면 처리를 시작하는 Observable은 Iterable의 iterator 메소드를 호출하는 시점부터 처리를 시작합니다.

따라서 Observable의 처리가 종료되지 않더라도 Iterable을 얻을 수 있습니다. 하지만 ConnectableObservable처럼 subscribe 메소드를 호출해도 처리를 시작하지 않는 경우 Iterator의 next 메소드를 호출하기 전에 처리를 시작하지 않으면 next 메소드를 호출한 시점에 처리가 멈춰 버리므로 주의해야 합니다.

데이터를 받으려면 Iterable에서 Iterator를 얻어와 next 메소드를 호출합니다. next 메소드를 호출할 때 데이터가 아직 통지되지 않은 상태라면 통지될 때까지 기다립니다. 반대로 통지된 데이터가 있어도 Iterator의 next가 호출되지 않았다면 데이터를 버퍼에 보관하고 있다가 next 메소드를 호출했을 때 오래된 데이터 순서대로 꺼냅니다. 이를 통해 알수 있는 것은, 오랫동안 데이터를 가져오지 않는다면 버퍼 저장 크기를 초과하며, 초과할 경우 버퍼 크기를 늘려 계속해서 처리합니다.

또한 완료 후에 데이터를 받으려고 하면 NoSuchElementException이 발생합니다. 그러니 hasNext 메소드를 통해 다음 데이터가 있는지 확인합니다.

그리고 이전과 동일하게, 데이터와 완료를 통지하는 Observable에서 사용하지 않으면 쓰레드가 멈춥니다!

    @Test
    fun getIterable() {
        val result = Observable.interval(300L, TimeUnit.MILLISECONDS)
            .take(5)
            .blockingIterable()

        val iterator = result.iterator()

        assertTrue(iterator.hasNext())
        
        assertThat(iterator.next(), `is`(0L))
        assertThat(iterator.next(), `is`(1L))
        assertThat(iterator.next(), `is`(2L))

        Thread.sleep(1000L)

        assertThat(iterator.next(), `is`(3L))
        assertThat(iterator.next(), `is`(4L))

        assertFalse(iterator.hasNext())
    }

=> 통과

이 테스트를 통해 통지된 데이터를 Iterator에서 모두 가져올 수 있는지를 확인합니다!
1000L을 기다렸다가 next 메소드를 통해 데이터를 가져오는 것을 확인할 수 있습니다.
또한 데이터를 모두 통지하면 false를 반환하는 것을 마지막 줄에서 볼 수 있습니다!

blockingSubscribe

blockingSubscribe는 호출한 원본 쓰레드에서 소비자에게 통지 데이터 처리를 실행할 수 있게하는 메소드입니다. subscribe 메소드가 호출되면 처리를 시작하는 Observable은 이 blockingSubscribe 메소드를 호출해 처리를 시작합니다.

blockingSubscribe 메소드의 인자로는 일반적인 subscribe 메소드에 전달할 수 있는 Observer뿐만 아니라 각 통지의 함수형 인터페이스들이 올 수 있습니다. 하지만 인자가 없는 blockingSubscribe메소드는 일반적인 subscribe 메소드와 달리 에러가 발생하면 에러를 던집니다.

blockingSubscribe 메소드를 실행하면 모든 데이터의 통지 완료 처리가 끝나거나 에러가 발생해 에러 통지 처리가 끝날 때까지 호출한 쓰레드에서 이후 처리를 진행하지 않습니다. 따라서 비동기적으로 Observable이 통지하고 이 통지를 받은 Observer가 어떤 부가 작용을 처리할 때 이 부가 작용의 결과를 확인하는 데 blockingSubscribe 메소드를 사용할 수 있습니다!

	@Test
    fun blockingSubscribe() {
        val result = Observable.interval(100L, TimeUnit.MILLISECONDS)
            .take(5)

        val count = Counter()
        result.blockingSubscribe(object : DisposableObserver<Long>() {
            override fun onNext(t: Long) {
                count.increment()
            }

            override fun onError(e: Throwable) {
                Assert.fail(e.message)
            }

            override fun onComplete() {
                //
            }
        })
        assertThat(count.count, `is`(6))
    }
=> Expected: is <6>
     but: was <5>
java.lang.AssertionError: 
Expected: is <6>
     but: was <5>

예상대로 5를 원했지만 6을 넣어줘 에러가 발생했다!
Observable은 총 5건의 데이터를 100밀리초마다 통지하고 Observer는 Counter의 increment 메소드를 호출합니다. 그리고 모든 통지 처리가 끝나면 Counter의 count 변수값이 기댓값과 같은지 확인합니다. 이 테스트가 성공하면 원래는 비동기적으로 이루어지는 Observable의 통지 처리를 호출 쓰레드에서 통지 데이터를 받아 처리함으로써 통지 처리로 발생한 부가 작용의 결과를 확인할 수 있습니다. 하지만 이 테스트에서는 상태가 변하는 객체를 다른 로직에서 접근하지 않는다고 가정하고 있으므로, 위의 테스트는 Observable이 단독으로 영향을 미치는 업무 로직만 확인합니다.

후기

지금까지 RxJava를 Kotlin으로 정리를 해봤습니다. 안드로이드가 아닌 RxJava에 대해서만 배워서 어떻게 안드로이드에 적용해볼지도 계속 생각해봐야 할 것 같고, 봐도봐도 헷갈리는 부분 또한 계속해서 복습해야할 것 같습니다. 생각날 때마다 복습도 해보고, 시간이 된다면 RxBinding을 통한 Android에서의 이벤트 처리도 포스팅 해보겠습니다! 감사합니다.

profile
뜻을 알고 코딩하기

0개의 댓글