4. Flowable과 Observable의 연산자(3)

안석주·2021년 12월 21일
0

RxJava

목록 보기
8/10

서론

드디어 마지막 장입니다!!!! 세 개의 포스트에 걸쳐 연산자들에 대해서 알아봤는데, 이중에서 분명히 안드로이드에서 자주 쓰이는 것도 있고, 아닌 것도 있습니다. 하지만, 함수 이름이라도 알아두는 것이 다시 찾아볼 때도, 사용할때도 좋겠죠???(물론 내부구현까지 알면 더 좋습니다)

그럼 4장 마지막 포스트를 시작해보겠습니다!

4.5 Observable 상태를 통지하는 연산자

isEmpty

Observable이 통지할 데이터가 있는지 판단

isEmpty() 메소드는 통지할 데이터가 있는지 판단하는 메소드로, 결과로 Single을 반환합니다!
isEmpty() 메소드가 반환하는 Single은 원본 Observable에서 데이터 없이 완료만 통지되면 true를, 데이터가 통지되면 false를 통지합니다. 판단 결과를 통보하는 시점이 완료 시점이기 때문에, 완료되지 않는 Observable에서는 사용할 수 없습니다.

아래 예제에서는 interval() 메소드로 생성한 Observable에 take() 메소드와 filter() 메소드를 이용해 조건을 걸어 통지하는 데이터를 제한해줍니다. 이때 isEmpty() 메소드를 이용해 통지할 데이터가 있는지 확인하고, 결과를 통지하는 예제입니다.

val observable = Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(3)
        .filter {
            it >= 2L
        }
        .isEmpty()

observable.subscribe(DebugSingleObserver())
Thread.sleep(4000L)
    
=> RxComputationThreadPool-1 : false

통지되는 데이터 0,1,2 중 2가 통지됩니다. 결과 데이터가 empty하지 않아 false를 리턴해줍니다! filter에 3L을 걸어준다면 true가 나오겠죠!

contains

Observable이 지정한 데이터를 포함하는지 판단

contains() 메소드는 인자의 데이터가 Observable에 포함됐는지를 판단해주고, Observable이 아닌 Single을 결과로 반환합니다. 반환하는 Single은 원본 Observable이 지정 데이터를 포함하면 true, 포함 안하거나, 통지할 데이터가 없으면 false를 통지합니다. 위와 동일하게 완료 시점에 통지합니다.

Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .contains(3L)
        .subscribe(DebugSingleObserver())

Thread.sleep(4000L)

=> RxComputationThreadPool-1 : true

0,1,2,3의 순서로 데이터를 통지하며, 3L을 포함하고 있어 true를 리턴합니다! 또한 실행해보면, 원본 Observable이 해당 데이터를 통지할 때까지 결과를 판단할 수 없으므로 처리를 시작하고 결과를 통지받는데 시간이 걸립니다. (진짜 걸림)

all

Observable의 모든 데이터가 조건에 맞는지 판단

all() 메소드는 Observable이 통지하는 모든 데이터가 설정한 조건에 맞는지를 판단하는 연산자로, 결과로 Single을 반환합니다. 위와 동일하게, 모든 데이터가 조건에 맞으면 true, 맞지 않으면 false를 통지합니다.

all() 메소드는 원본 Observable이 데이터를 통지하고 완료를 통지한 시점에서 결과로 true를 반환하며, Observable이 비었을 때도 true를 통지합니다. 통지한 데이터가 조건과 하나라도 다르다면 맞지 않는 데이터를 받은 시점에 false를 통지합니다.

Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(3)
        .all {
            it < 5
        }
        .subscribe(DebugSingleObserver())
Thread.sleep(4000L)

=> RxComputationThreadPool-1 : true

0,1,2를 통지하는 데이터에서 모든 데이터가 5 이하이므로 true를 통지합니다.

sequenceEqual

두 Observable이 같은 순서로 같은 수의 데이터를 통지하는지 판단

sequenceEqual() 메소드는 인자로 전달된 두개의 Observable이 통지하는 데이터가 동일한지 그리고 같은 순서로 같은 수를 통지하는지 판단하는 연산자입니다. 이떄 결과로 Single을 반환합니다. sequenceEqual() 메소드는 데이터만 비교하고, 통지시점은 비교하지 않으므로 통지 간격이 다르더라도 순서와 데이터만 같다면 true를 통지합니다.

판단 결과는 모든 데이터를 통지하여 완료를 통지한 시점이며, 모두 같다면 true, 다르면 false를 통지합니다(false는 틀린 즉시 통지).

sequenceEqual() 메소드는 두개가 존재하고, 인자로 source1, source2를 받는 함수와 source1, source2, BiPredicate를 인자로 받는 함수 2개가 있습니다. (안에 보니 버퍼 사이즈를 받는 함수가 하나 더있긴 하네요..! 책에는 2개라고 나와있는데, 추가된 것 같습니다)

BiPredicate는 각 Observable이 통지한 같은 인덱스의 데이터를 받아 비교하고 두 데이터가 같으면 true를 반환하는 함수형 인터페이스입니다.

아래 예제는 interval() 메소드를 통해 생성한 Observable과 just() 메소드로 생성한 Observable이 같은 순서로 같은 수만큼 있는지 확인합니다.

val observable = Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(3)

val observable2 = Observable.just(0L, 1L, 2L)

val result = Observable.sequenceEqual(observable, observable2)
        .subscribe(DebugSingleObserver())

Thread.sleep(4000L)
    
=> RxComputationThreadPool-1 : true

count

Observable의 데이터 개수 통지

count() 메소드는 Observable의 데이터 개수를 통지하는 연산자로, Single을 결과로 반환합니다. 데이터 개수의 타입은 Long입니다.

아래 예제를 봅시다!

Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(3)
        .count()
        .subscribe(DebugSingleObserver())

Thread.sleep(4000L)

=> RxComputationThreadPool-1 : 3

take 때문에 개수는 결과는 3개입니다!

4.6 Observable 데이터를 집계하는 연산자

reduce/reduceWith

Observable의 데이터를 계산하고 최종 집계 결과만 통지

reduce() 메소드는 설정한 집계 방식으로 Observable의 데이터를 계산하고, 최종 결과만을 통지하는 연산자입니다. 어떻게 계산할지는 인자로 지정한 함수형 인터페이스에서 정의합니다.
인자로 초기값을 받으면 Single을 리턴하고, 받지 않으면 Maybe를 반환값으로 생성합니다.

reduceWith() 메소드는 인자로 초기값을 생성하는 함수형 인터페이스를 받습니다.
예제를 통해 쉽게 보겠습니다!

Observable.just(1, 10, 100, 1000, 10000)
        .reduce(0) { sum, data ->
            sum + data
        }.subscribe(DebugSingleObserver())
        
=> main : 11111

초기값 0과, 1을 더해주고, 후에 1에 10을, 11에, 100을 더해주는 형식으로 최종값 11111만 리턴해줍니다.

scan

Observable의 데이터를 계산하고 각 계산 결과를 통지

scan() 메소드는 원본 Observable이 통지한 데이터를 인자의 함수형 인터페이스를 사용해 집계하는 연산자입니다. 이 메소드는 인자의 함수형 인터페이스를 반복 호출하여 집계하는데, 메소드의 통지 값은 계산할 때마다 생성되는 결과값입니다. reduce의 경우 마지막 결과값만 리턴하는 방면, scan은 계속해서 결과값을 뱉어냅니다. 초기값이 있다면 초기값을 그대로 통지하고, 없다면 Observable이 통지하는 데이터를 그대로 첫 데이터로 통지합니다. 중간 결과가 필요 없다면 reduce() 메소드를 이용합니다!

아래 예제의 결과를 보면 이해가 쉬울 겁니다!

Observable.just(1, 10, 100, 1000, 10000)
        .scan(0) { sum, value ->
            sum + value
        }.subscribe(DebugObserver())
        
=> main : 0
main : 1
main : 11
main : 111
main : 1111
main : 11111
main : 완료

4.7 유틸리티 연산자

repeat

데이터 통지를 처음부터 반복

repeat() 메소드는 원본 Observable이 처리를 완료하면 데이터 통지를 처음부터 반복하는 연산자입니다. '1,2,3'을 통지하고 완료할 때 repeat() 메소드를 사용하면 '1,2,3,1,2,3..'처럼 데이터를 반복통지할 수 있습니다. repeat() 메소드는 인자가 없다면 완료 통지 없이 계속해서 반복합니다. 또한 0이하의 값을 인자로 주면 IllegalArgumentException이 발생하며, 다른 인자를 주면 그 횟수만큼 통지합니다.

Observable.just("A", "B", "C")
        .repeat(2)
        .subscribe(DebugObserver())

=> main : A
main : B
main : C
main : A
main : B
main : C
main : 완료

repeatUntil

지정한 조건이 될 때까지 통지 반복

repeatUntil() 메소드는 지정한 조건이 될 때까지 Observable이 데이터 통지를 반복합니다.
이때 조건은 인자 없이 Boolean 값을 반환하는 메소드를 가지는 함수형 인터페이스를 지정합니다. 조건을 판단하는 시점은 원본 Observable이 완료를 통지하는 시점으로, 판단 결과가 false면 통지를 반복하고, true면 완료 통지 후 처리를 종료합니다.

val startTime = System.currentTimeMillis()

Observable.interval(100L, TimeUnit.MILLISECONDS)
        .take(7)
        .repeatUntil {
            println("called")
            return@repeatUntil System.currentTimeMillis() - startTime > 900L
        }.subscribe(DebugObserver())

Thread.sleep(1000L)

=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 5
RxComputationThreadPool-1 : 6
called
RxComputationThreadPool-2 : 0
RxComputationThreadPool-2 : 1

return 값이 false면 다시 반복합니다.

repeatWhen

반복 처리 정의 및 통지 반복

repeatWhen() 메소드는 지정한 시점까지 원본 Observable의 통지를 반복하는 연산자 입니다.
반복 여부는 repeatWhen() 메소드의 인자인 함수형 인터페이스에서 판단합니다. 함수형 인터페이스는 완료를 통지하는 원본 Observable을 인자로 받고, 이를 변환해 반환합니다.

이때 변환된 Observable이 데이터를 통지하면 원본 Observable이 데이터 통지를 반복하고, 변환된 Observable이 완료를 통지하면 결과로 완료를 통지하고 처리를 끝냅니다. 또한, repeatWhen() 메소드의 함수형 인터페이스가 빈 Observable을 반환하면 데이터 통지 없이 완료를 통지하고 종료합니다.

반복 시작 시점은 변환된 Observable이 데이터를 통지한 시점이므로, delay() 메소드로 통지를 지연하면 반복 통지 시작을 늦출 수 있습니다. 그리고 반환된 Observable이 원본 Observable과 다른 쓰레드에서 작동하면 반복 도중에 완료를 통지할 수 있으므로 주의해야 합니다.

Observable.just(1, 2, 3)
        .repeatWhen { handler ->
            handler
                .delay(1000L, TimeUnit.MILLISECONDS)
                .take(2)
                .doOnNext { println("emit : $it") }
                .doOnComplete { println("complete") }
        }
        .map { data ->
            val time = System.currentTimeMillis()
            return@map "$time : $data"
        }.subscribe(DebugObserver())

Thread.sleep(5000L)

=> main : 1640126409396 : 1
main : 1640126409396 : 2
main : 1640126409396 : 3
emit : 0
RxComputationThreadPool-1 : 1640126410407 : 1
RxComputationThreadPool-1 : 1640126410407 : 2
RxComputationThreadPool-1 : 1640126410407 : 3
emit : 0
RxComputationThreadPool-1 : 1640126411417 : 1
RxComputationThreadPool-1 : 1640126411417 : 2
RxComputationThreadPool-1 : 1640126411417 : 3
complete
RxComputationThreadPool-1 : 완료

결과를 보면 함수형 인터페이스로 생성한 Observable이 데이터를 통지한 시점에 반복을 실행하고 완료를 통지한 시점에 결과 통지를 완료합니다. 또한 함수형 인터페이스로 생성한 Observable이 1000밀리초 늦게 데이터를 통지하므로, 반복도 1000밀리초 늦게 실행됩니다.

하지만 주의해야할 점이 있습니다!
원본 Observable이 interval() 메소드처럼 다른 쓰레드에서 처리되면 함수형 인터페이스의 반환값인 Observable은 비동기 처리를 하려고 두 번재 데이터를 통보하자마자 바로 완료를 통지합니다.

Observable.interval(100L, TimeUnit.MILLISECONDS)
        .take(3)
        .repeatWhen { handler ->
            handler
                .delay(1000L, TimeUnit.MILLISECONDS)
                .take(2)
                .doOnNext { println("emit : $it") }
                .doOnComplete { println("complete") }
        }
        .map { data ->
            val time = System.currentTimeMillis()
            return@map "$time : $data"
        }.subscribe(DebugObserver())

Thread.sleep(5000L)

=> RxComputationThreadPool-1 : 1640126690598 : 0
RxComputationThreadPool-1 : 1640126690694 : 1
RxComputationThreadPool-1 : 1640126690786 : 2
emit : 0
RxComputationThreadPool-3 : 1640126691909 : 0
RxComputationThreadPool-3 : 1640126692017 : 1
RxComputationThreadPool-3 : 1640126692111 : 2
emit : 0
complete
RxComputationThreadPool-2 : 완료

결과를 보면 take로 3번 반복을 통지할거라고 예상하지만, 2번만 반복합니다.

delay

데이터 통지 시점 지연

delay() 메소드는 Observable에서 받은 데이터를 설정한 기간만큼 지연해 통지합니다.
delay() 메소드와 delaySubscription() 메소드와 다른점은, delay() 메소드는 데이터를 생성하고 설정한 시간만큼 지연한 뒤 통지하지만, 설정한 시간만큼 처리 시작 시각을 지연한 뒤 데이터를 생성하고 바로 통지하는 점에서 다릅니다.

println("처리 시작 : ${System.currentTimeMillis()}")

    Observable.create<String> {
        println("구독 시작 : ${System.currentTimeMillis()}")
        it.onNext("A")
        it.onNext("B")
        it.onNext("C")
        it.onComplete()
    }
        .delay(2000L, TimeUnit.MILLISECONDS)
        .doOnNext {
            println("통지 시각 : ${System.currentTimeMillis()}")
        }
        .subscribe(DebugObserver())
Thread.sleep(3000L)

=> 처리 시작 : 1640127157533
구독 시작 : 1640127157650
통지 시각 : 1640127159656
RxComputationThreadPool-1 : A
통지 시각 : 1640127159656
RxComputationThreadPool-1 : B
통지 시각 : 1640127159657
RxComputationThreadPool-1 : C
RxComputationThreadPool-1 : 완료

결과를 보면 '구독 시작' 시각과 '통지 시작'의 시간 차이가 2000밀리초임을 볼 수 있습니다.
또한 처리 시작과 구독 시작의 시간차이는 거의 없으므로 구독 자체가 지연되는 것은 아님을 알 수 있습니다.

delaySubscription

처리 시작 지연

delaySubscription() 메소드는 위에 설명했던 것처럼, 구독을 늦춰줍니다.

println("처리 시작 : ${System.currentTimeMillis()}")

    Observable.create<String> {
        println("구독 시작 : ${System.currentTimeMillis()}")
        it.onNext("A")
        it.onNext("B")
        it.onNext("C")
        it.onComplete()
    }
        .delaySubscription(2000L, TimeUnit.MILLISECONDS)
        .subscribe(DebugObserver())
Thread.sleep(3000L)
    
=> 처리 시작 : 1640127295929
구독 시작 : 1640127298054
RxComputationThreadPool-1 : A
RxComputationThreadPool-1 : B
RxComputationThreadPool-1 : C
RxComputationThreadPool-1 : 완료

5929 -> 8054 에서 대략 2000밀리초(+a) 만큼 구독 시간 차이가 발생함을 볼 수 있습니다.

timeout

데이터 통지의 타임아웃 설정

timeout() 메소드는 데이터를 통지할 때 설정된 시간을 넘기면 에러를 통지하거나 대체 Observable의 데이터를 통지하는 연산자입니다. 또한, 인자로 대체 Observable이 없으면 에러를 통지합니다. 이떄 에러 객체는 TimeoutException입니다.

Observable.create<Int> {
        it.onNext(1)
        it.onNext(2)
        try {
            Thread.sleep(1200L)
        } catch (e: InterruptedException) {
            it.onError(e)
            return@create
        }
        it.onNext(3)
        it.onComplete()
    }.timeout(1000L, TimeUnit.MILLISECONDS)
        .subscribe(DebugObserver())

Thread.sleep(2000L)
=>  main : 1
main : 2
java.util.concurrent.TimeoutException: The source did not signal an event for 1000 milliseconds and has been terminated.
	at io.reactivex.rxjava3.internal.operators.observable.ObservableTimeoutTimed$TimeoutObserver.onTimeout(ObservableTimeoutTimed.java:134)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableTimeoutTimed$TimeoutTask.run(ObservableTimeoutTimed.java:165)
...    

1과 2를 통지하고 나서 1200밀리초를 대기하다가, timeout에 의해 에러를 통지합니다.

결론

지금까지 4장, RxJava의 연산자에 대해서 정리해봤습니다. 정리하면서 몰랐던 것, 헷갈렸던 것 모두 정리하면서 하느라 오래걸리기도 했지만, 다양한 연산자와 사용법에 대해서 알게됐던 것 같습니다. 안드로이드에서는 비동기 통신을 통해 값을 가져와 처리해줄 텐데, 자주 사용하면서 익숙해질 시간이 필요할 것 같습니다! 다음엔 5장으로 돌아오겠습니다!

profile
뜻을 알고 코딩하기

0개의 댓글