4. Flowable과 Observable의 연산자(2)

안석주·2021년 12월 8일
0

RxJava

목록 보기
7/10

서론

4장의 두 번째, 4.3장과 4.4장을 정리해보겠습니다.
4.3장은 통지 데이터를 제한하는 연산자로 자주쓰는 filter, distinct, take 등 생산자에서 나온 데이터를 개수나 조건에 따라 걸러 필요한 데이터만 걸러내는 함수를 알아봅니다.
4.4장은 생산자를 결합하는 연산자를 알아봅니다.

4.3 통지 데이터를 제한하는 연산자

filter

지정한 조건에 맞는 데이터만 통지

filter() 메소드는 받은 데이터가 조건에 맞는지를 판정해 결과가 true인 것만 통지하는 연산자 입니다. 조건 판정은 인자로 받는 함수형 인터페이스에서 이루어집니다! filter 메소드는 매우 자주 사용되니 잘 알아두어야 합니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .filter {
            it.toInt() % 2 == 0
        }.subscribe(DebugObserver())

    Thread.sleep(3000L)
    
=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 6
RxComputationThreadPool-1 : 8

실행 결과에 짝수 데이터만 통지되는 것을 볼 수 있습니다.

distinct

이미 통지한 데이터와 같은 데이터를 제외하고 통지

distinct() 메소드는 통지하려는 데이터가 이미 통지된 데이터와 같다면 해당 데이터를 통지하지 않는 연산자입니다. 이 메소드 내부에 HashSet을 가지고 있어, 이 HashSet으로 데이터가 같은지를 확인합니다.

Observable.just("A", "a", "B", "b", "A", "a", "B", "b")
        .distinct()
        .subscribe(DebugObserver())

=> main : A
main : a
main : B
main : b
main : 완료

distinct(keySelector)

keySelector는 받은 데이터를 기반으로 비교용 데이터를 생성하는 함수형 인터페이스로, 이 함수형 인터페이스의 반환값으로 데이터가 같은지를 판단합니다. 단, 결과로 통지할 때는 비교용 변환 데이터가 아닌, 받은 데이터 그대로 통지합니다. 이를 코드로 보면 이해하기 쉽습니다!

Observable.just("A", "a", "B", "b", "A", "a", "B", "b")
        .distinct{
            it.toLowerCase()
        }
        .subscribe(DebugObserver())
=> main : A
main : B
main : 완료

위의 코드가 distinct(keySelector)을 이용한 예제입니다. 전달받은 "A"를 "a"로(LowerCase로) 변환하여 통지합니다. 또한 "a"가 이미 통지됐기 때문에 다음의 소문자 "a"는 통지되지 않습니다. 또한 결과에서는 변환된 "a"가 아닌, 원본의 데이터 "A"가 출력됩니다.

distinctUntilChanged

연속된 같은 값의 데이터는 제외하고 통지

이 메소드는 데이터에 중복 값이 있어도 통지합니다. 그러나 연속으로 나오는 중복 값은 통지하지 않습니다.
쉽게 보면 "a","b","a"는 a,b,a를 통지하고, "a","a","b","a","a" 또한 a,b,a를 통지합니다!(쉽죠 ㅎㅎ)
distinctUntilChanged는 세가지 메소드가 있습니다.

인자타입인자 이름설명
FunctionkeySelector받은 데이터와 비교할 데이터를 생성하는 함수형 인터페이스
BiPredicatecomparer바로 앞 데이터와 현재 데이터가 같은지를 판단하는 함수형 인터페이스

하나씩 보겠습니다.

distinctUntilChanged()

Observable.just("A", "a", "a", "A", "a")
        .distinctUntilChanged()
        .subscribe(DebugObserver())
        
=>main : A
main : a
main : A
main : a
main : 완료

별거 없이 위의 쉬운 설명과 동일하네요!
keySelector는 위의 distinct(keySelector)와 동일하기 때문에 건너 뛰겠습니다!

distinctUntilChanged(comparer)

이 메소드는 바로 앞서 받은 데이터와 현재 데이터가 같은지를 지정한 방법으로 비교합니다. 비교한 결과로 true를 반환하면 해당 데이터는 통지하지 않습니다.

다음 예제에서는 숫자의 문자열을 통지하는 Observable을 just() 메소드로 생성합니다. 그리고 이 Observable이 통지하는 데이터에 소수점 자릿수와 관계없이 같은 값이 연속되면 distinctUntilChanged() 메소드로 이 데이터를 제외하고 통지합니다. 이 메소드에서 함수형 인터페이스에서는 받은 데이터와 바로 앞서 받은 데이터를 비교하고자 두 숫자를 BigDemical로 변환하고 compareTo() 메소드로 이 둘이 같은지를 판단합니다. 다만, 통지할 때는 비교하려고 변환한 데이터가 아니라 원본 데이터 그대로 통지합니다!

Observable.just("1", "1.0", "0.1", "0.10", "1")
        .distinctUntilChanged { data1, data2 ->
            val convert1 = BigDecimal(data1)
            val convert2 = BigDecimal(data2)
            return@distinctUntilChanged (convert1.compareTo(convert2) == 0)
        }.subscribe(DebugObserver())

=> main : 1
main : 0.1
main : 1
main : 완료

소수점 자리수와 관계없이 받은 데이터가 바로 앞 데이터와 같은 데이터면 해당 데이터를 통지하지 않는 것을 확인할 수 있습니다!

take

지정한 개수나 기간까지만 데이터를 통지

take함수도 자주 사용됩니다. take함수는 쉬우니 보시기만 해도 이해가 갈껍니다!

인자 타입인자 이름설명
longcount통지할 수 있는 데이터 개수
longtime데이터를 통지할 수 있는 기간
TimeUnitunit데이터를 통지할 수 있는 기간의 단위

TimeUnit은 잘 쓰이지 않는 것 같긴 합니다...

3개만 통지하는 예시를 보죠!

Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(3)
        .subscribe(DebugObserver())
Thread.sleep(4000L)

=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 완료

takeUntil

지정된 조건에 도달할 때까지 통지

takeUntil() 메소드는 인자로 지정한 조건이 될 때까지 데이터를 통지하는 연산자로, takeUntil() 메소드에는 두종류가 있습니다.
1. 받은 데이터를 지정한 조건으로 판단해 그 결과가 true가 될 때까지 데이터를 통지하는 메소드
2. 인자로 받은 Observable이 처음으로 데이터를 통지할 때까지 계속해서 데이터를 통지하는 메소드
위의 두 메소드는 지정한 조건이 되면 완료를 통지합니다. 지정한 개수 또는 기간에 도달하기 전, 원본 Observable이 완료되면 모든 데이터를 통지한 후에 처리를 종료합니다.

인자 타입인자 이름설명
PredicatestopPredicate받은 데이터를 판단해 통지를 끝낼 조건이 되면 true를 반환하는 함수형 인터페이스
Publisher/ObservableSourceother첫 번째 데이터의 통지 시점 또는 완료 통지 시점에 결과로 데이터 통지를 멈추고 완료하게 하는 Observable

아래 예제를 통해 위의 메소드들을 알아봅시다!

takeUntil(stopPredicate)

첫 번째 예제는 interval() 메소드로 생성한 Observable에서 받은 데이터가 '3'이 될때까지 takeUntil() 메소드로 통지합니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .takeUntil {
            it.toInt() == 3
        }.subscribe(DebugObserver())
Thread.sleep(2000L)
    
=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 완료

takeUntil(other)

아래 예제는 interval로 생성하고 takeUnitl() 메소드의 인자 Observable이 데이터를 통지할 때까지 계속해서 생성한 Observable에서 받은 데이터를 통지합니다. 이때 takeUntil() 메소드의 인자 Observable은 1000밀리초 뒤에 숫자 '0' 하나만 통지하고 종료합니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .takeUntil(Observable.timer(1000L, TimeUnit.MILLISECONDS))
        .subscribe(DebugObserver())
Thread.sleep(2000L)

=> RxComputationThreadPool-2 : 0
RxComputationThreadPool-2 : 1
RxComputationThreadPool-2 : 2
RxComputationThreadPool-1 : 완료

이를 표로 보면 다음과 같습니다!

경과시간03006009001000
원본 Observable012
인자 Observable0
결과 통지012완료

takeWhile

지정한 조건에 해당할 때만 데이터 통지

takeWhile() 메소드는 인자로 지정한 조건이 true일동안 받은 데이터를 그대로 통지하는 연산자 입니다. 이 메소드는 받은 데이터를 지정한 조건으로 판단해 true면 데이터를 통지하고 false면 완료를 통지하고 처리를 종료합니다. 이때 판단 결과 false인 데이터는 통지되지 않습니다.
또한, 판단 결과가 처음부터 false라면 데이터를 통지하지 않고 완료를 통지합니다. 반대로 판단 결과가 false가 되기 전에 원본 Observable이 완료를 통지하면, 그대로 완료를 통지하고 처리를 종료합니다.
아래 예제는 받은 데이터가 3이 아닌 경우 통지하는 예제입니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .takeWhile {
            it != 3L
        }.subscribe(DebugObserver())
Thread.sleep(2000L)

=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 완료

조건을 false로 만드는 3이 통지되지 않고 완료를 출력하는 것을 볼 수 있습니다.

takeLast

끝에서부터 지정한 조건까지의 데이터 통지

takeLast() 메소드는 원본 Observable이 완료될 때 마지막 데이터부터 지정한 개수나 지정한 기간의 데이터만을 세어 통지하는 연산자입니다. 완료 시점에 데이터를 통지하므로, 완료를 통지하지 않는다면 사용할 수 없습니다. 결과 데이터를 통지하는 시점은 원본 Observable의 통지 시점이 아닌 원본 Observable의 완료 통지 시점이며, 이때 조건에 맞는 데이터를 통지합니다.
또한 원본 Observable이 지정한 개수보다 적게 데이터를 통지하거나 지정한 기간보다 일찍 완료 통지하면, 모든 데이터를 통지하고 완료합니다.

이전의 take() 메소드와 인자가 동일하니 쓰지 않겠습니다!(int, Long, TimeUnit)

takeLast(count)

아래 예제는 interval() 메소드로 생성한 Observable이 데이터를 5건까지만 통지하게 take메소드로 만들고, 그리고 통지한 데이터중 마지막 2개만 takeLast() 메소드로 통지합니다.

Observable.interval(800L, TimeUnit.MILLISECONDS)
        .take(5)
        .takeLast(2)
        .subscribe(DebugObserver())
Thread.sleep(5000L)
    
=>RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 완료

이는 take() 메소드를 이용해 0, 1, 2, 3, 4를 통지하고, takeLast() 메소드를 이용해 3, 4만 얻어옵니다!

takeLast(count, time, unit)

이 메소드는 takeLast(count)와 takeLast(time, unit)을 합쳐서 처리합니다.
이 메소드는 먼저 마지막 통지 시점부터 지정한 시간까지의 데이터를 얻고, 이 중에서 끝에서부터 지정한 개수만큼의 데이터를 세어 통지합니다.

아래 예제는 interval() 메소드로 생성한 Observable이 데이터를 10건까지 통지하게 take() 메소드로 설정합니다. 그리고 완료 통지 전 1000밀리초 동안 통지된 데이터 중에서 끝에서부터 2건만 통지하게 takeLast() 메소드로 설정합니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(10)
        .takeLast(2, 1000L, TimeUnit.MILLISECONDS)
        .subscribe(DebugObserver())

Thread.sleep(4000L)
    
=> RxComputationThreadPool-1 : 8
RxComputationThreadPool-1 : 9
RxComputationThreadPool-1 : 완료

이는 10개를 통보하는 데이터를, 완료 통지 전 1000밀리초 동안 통지한 데이터 중에서 끝에서부터 2건의 데이터만 결과로 통지합니다.
아래 표를 통해 결과를 확인해봅시다!

경과시간03006009001200150018002100240027003000
원본 Observable0123456789
지정 기간의 데이터6789
통지 대상 데이터89

take() 메소드를 통해 0~9까지 10개의 데이터를 생성, 완료 통지 전 1000밀리 초동안 만들어진 데이터들 중에서 takeLast(2)로 2개의 데이터만 통지합니다.

skip

앞에서부터 지정된 범위까지의 데이터는 통지 대상에서 제외

skip() 메소드는 앞에서부터 지정한 만큼 데이터를 건너뛴 후 나머지 데이터를 통지하는 연산자입니다. 건너뛰는 범위는 데이터 개수나 경과 시간으로 지정할 수 있습니다. 지정한 범위가 원본 Observable이 통지하는 데이터보다 많으면, 데이터를 통지하지 않고 완료 처리 후 종료합니다.

인자 타입인자 이름설명
longcount통지 제외 데이터 개수
longtime통지 제외 기간
TimeUnitunit통지 제외 기간의 단위

아래 예제는 interval() 메소드로 Observable을 생성하고, skip() 메소드로 이 Observable이 통지하는 데이터 중에서 처음 두 건은 통지되지 않게 합니다.

Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .skip(2)
        .subscribe(DebugObserver())
Thread.sleep(5000L)

=> RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4

0,1은 통지에서 제외됐습니다.

skipUntil

인자 Observable이 데이터를 통지할 때까지 데이터 통지를 건너뜀

이 메소드는 지정한 Observable이 데이터를 통지하면 그때부터 결과 데이터를 통지합니다.
이전의 takeUntil은 인자 Observable이 통지할 때까지 통지했다면, skipUntil은 반대로 인자 Observable이 통지할 때까지 skip했다가 통지합니다.

Observable.interval(300L,TimeUnit.MILLISECONDS)
        .skipUntil(Observable.timer(1000L,TimeUnit.MILLISECONDS))
        .subscribe(DebugObserver())

Thread.sleep(2000L)

=> RxComputationThreadPool-2 : 3
RxComputationThreadPool-2 : 4
RxComputationThreadPool-2 : 5

결과를 보면 0,1,2,3,4,5을 생성하는 원본 Observable에서 1000밀리초에 0을 생성한 뒤, 1200밀리초부터 3,4,5를 순대로 통지합니다.

skipWhile

지정한 조건에 해당할 때는 데이터 통지 제외

skipWhile() 메소드는 인자로 지정한 조건이 true일 경우 데이터를 통지하지 않는 연산자입니다. 이 메소드는 받은 데이터를 지정한 조건으로 판단해 그 결과가 true이면 데이터를 통지하지 않고, false면 해당 시점부터 데이터를 통지합니다. 한번 통지를 시작하면, 또다시 전송 여부를 판단하지 않습니다. 판단 결과가 한 번도 false가 되지 않는다면 완료만 통지하고 종료합니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .skipWhile { it != 3L }
        .subscribe(DebugObserver())

    Thread.sleep(2000L)
    
=> RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 5

받은 데이터가 3L일 때까지 skip 합니다! 그 이후에는 데이터를 계속해서 통지합니다.(= 전송 여부를 판단하지 않습니다!)

skipLast

끝에서부터 지정한 범위만큼 데이터 통지 제외

skipLast() 메소드는 원본 Observable이 통지하는 데이터 중에서 끝에서부터 지정한 범위만큼의 데이터를 통지하지 "않는" 연산자입니다. 인자는 위에서 썼던(Int, Long, TimeUnit)입니다.

Observable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(5)
        .skipLast(2)
        .subscribe(DebugObserver())
Thread.sleep(6000L)

=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 완료

interval() 메소드로 1000L마다 데이터를 생성, 5건만 통지합니다. 0,1,2,3,4가 생성된 상황에서 skipLast(2), 마지막 2건을 통지하지 않습니다.

throttleFirst

데이터 통지 후 지정 시간 동안 데이터를 통지하지 않음

throttleFirst() 메소드는 데이터를 통지하고 나면, 지정 시간 동안 통지하는 데이터는 파기하는 연산자로, 처리가 완료될 때까지 이 작업을 반복합니다. 단기간에 대량으로 들어온 데이터가 모두 필요한 데이터가 아니라면, 이 연산자로 데이터를 쳐낼 수 있습니다. 특히나 안드로이드에서 중복 클릭을 막기 위해 시간 단위로 throttleFirst()를 이용하면, 한번 클릭이 된 후 지정한 시간 초(ex: 2초)간 클릭이 되어도, 데이터를 통지하지 않고 파기할 수 있습니다!(= 중복 클릭 방지)

    Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(10)
        .throttleFirst(1000L,TimeUnit.MILLISECONDS)
        .subscribe(DebugObserver())

Thread.sleep(4000L)

=>RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 8
RxComputationThreadPool-1 : 완료

보기 쉽게 표로 정리해보겠습니다!

경과 시간0300600900120013001500180021002400250027003000
원본 Observable0123456789
결과 데이터048

데이터를 통지하면(300밀리 초에서), 1000밀리초 동안 다음 데이터를 통지하지 않다가, 그 다음 데이터 통지한 뒤 또 1000밀리초동안 다음 데이터를 통지하지 않음을 반복합니다.

throttleLast/sample

지정한 시간마다 가장 마지막에 통지된 데이터만 통지

throttleLast()

throttleFirst의 반대입니다! 첫 데이터 이후 x 밀리초를 기다린다면, 데이터 통지 후 반대로 x밀리초를 기다린 뒤 그중 가장 마지막 데이터를 통지합니다.
표를 먼저 보고 코드를 보겠습니다!

경과 시간030060090010001200150018002000210024002700
원본 Observable012345678
결과 데이터25
Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(9)
        .throttleLast(1000L,TimeUnit.MILLISECONDS)
        .subscribe(DebugObserver())

Thread.sleep(3000L)

=> RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 5
RxComputationThreadPool-2 : 완료

sample(sampler)

sample() 메소드의 인자 Observable이 데이터를 통지하는 시점에 interval() 메소드로 생성한 Observable에서 가장 마지막에 받은 데이터를 통지합니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(9)
        .sample(Observable.interval(1000L, TimeUnit.MILLISECONDS))
        .subscribe(DebugObserver())

Thread.sleep(3000L)

=> RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 5
RxComputationThreadPool-2 : 완료

sample() 안의 Observable이 생성될 때 가장 마지막에 받은 데이터를 통지합니다.
비동기 처리기 때문에 다소 오차가 있을 수 있지만, 앞의 예제와 동일한 처리한 처리를 하므로, 같은 결과가 나옵니다.

throttleWithTimeout/debounce

데이터를 받은 후 일정 기간 안에 다음 데이터를 받지 못하면 현재 데이터 통지

throttleWithTimeout/debounce() 메소드는 원본 Observable에서 데이터를 받고 나서 일정 기간 안에 다른 데이터를 받지 않으면 현재 받은 데이터를 통지하는 연산자입니다. 지정 기간에 다음 데이터를 받으면 새로 데이터를 받은 시점부터 다시 지정 기간 안에 다음 데이터가 오는지를 확인합니다. 지정 기간 내라 해도 완료나 에러 통지는 가능하며, 완료가 통지되면 마지막으로 통지된 데이터와 함께 완료를 통지하며, 에러 통지시 에러만 통지합니다.
throttleWithTimeout/debounce() 메소드는 인자가 같으면 이름만 다를 뿐 같은 처리 작업을 합니다.

안드로이드에서 debounce는 editText에서 입력 도중에는 아무런 처리를 하지 않다가, x초 동안 입력이 멈추면 행동을 하는 방식으로 사용합니다!

Observable.create(ObservableOnSubscribe<String> {
        it.onNext("A")
        Thread.sleep(1000L)

        it.onNext("B")
        Thread.sleep(300L)

        it.onNext("C")
        Thread.sleep(300L)

        it.onNext("D")
        Thread.sleep(1000L)

        it.onNext("E")
        Thread.sleep(100L)

        it.onComplete()
    })
        .throttleWithTimeout(500L, TimeUnit.MILLISECONDS)
        .subscribe(DebugObserver())
        
=> RxComputationThreadPool-1 : A
RxComputationThreadPool-1 : D
main : E
main : 완료

A를 받은 뒤 500 밀리초 동안 데이터가 오지 않아서 A가 통지됩니다. B와 C는 300 밀리초 안에 데이터가 오므로 통지하지 않고 D를 받은 뒤 1000 밀리초 동안 데이터가 오지 않아 통지됩니다. 후에 onComplete() 메소드가 불려 마지막 데이터 E와 함께 완료가 통지됩니다.

elementAt/elementAtOrError

지정한 위치의 데이터만 통지

elementAt/elementAtOrError() 메소드는 둘 다 지정한 위치의 데이터만을 통지하는 연산자입니다. 다만 통지할 데이터가 없을 때는 서로 다르며, 결과로 생성되는 반환값도 Observable이 아닌 Single/Maybe입니다. 특히 elementAt() 메소드는 인자에 따라 반환 값이 Single 또는 Maybe로 바뀌니 조심해야 합니다!

아래 예제는 interval() 메소드로 Observable을 생성하고 elementAt() 메소드로 Observable이 통지하는 데이터 중에서 elementAt() 메소드의 인자 위치(위치는 0부터 시작)에 있는 데이터만 결과로 통지하게 합니다!

반환값메소드설명
Maybe<T>elementAt(long index)원본 Observable이 통지하는 데이터중에서 인자로 받은 index에 있는 데이터만 통지하는 Maybe를 생성합니다. 데이터가 없으면 완료 통지하는 Maybe 생성!
Single<T>elementAt(long index,T defaultItem)원본 Observable이 통지하는 데이터 중에서 인자로 받은 index에 있는 데이터만 통지하는 Single을 생성합니다. 데이터가 없으면 인자로 받은 defaultItem을 통지하는 Single을 생성합니다.
Single<T>elementAtOrError(long index)원본 Observable이 통지하는 데이터 중에서 인자로 받은 index에 있는 데이터만 통지하는 Single을 생성합니다. 데이터가 없으면 NoSuchElementException 에러를 통지하는 Single을 생성합니다.

아래 예제를 봅시다!

Observable.interval(100L, TimeUnit.MILLISECONDS)
        .elementAt(3)
        .subscribe(DebugMaybeObserver())

Thread.sleep(1000L)
    
=> RxComputationThreadPool-1 : 3

인덱스 또한 0부터 시작이니, 0,1,2,3 중 3을 출력!
Maybe는 데이터 통지(onSuccess), 완료 통지(onComplete), 에러 통지(onError) 중 하나를 통지하는데, 이 예제에서는 데이터가 있으니 데이터만 통지하고 종료합니다.

요기 까지가 4.3장입니다! 어렵지 않죠..? 4.4로 넘어갑니다!

4.4 Observable을 결합하는 연산자

merge/mergeDelayError/mergeArray/mergeArrayDelayError/mergeWith

여러 개의 Observable을 하나로 병합하고, "동시 실행"

merge/mergeDelayError/mergeArray/mergeArrayDelayError/mergeWith 메소드는 여러 Observable에서 받은 데이터를 하나의 Observable로 통지하는 연산자입니다. 이러한 merge 계열의 메소드를 사용하면 여러 Observable의 통지를 하나의 Observer로 구독할 수 있습니다! 이 메소드들은 여러 개의 Observable을 인자로 전달 받아 처리를 시작할 때 Observable들을 동시에 실행하고, 각각 데이터를 통지하는 시점에 결과 Observable도 데이터를 통지합니다. 하지만 여러 Observable이 동시에 데이터를 통지하더라도, 결과를 통지할 때는 동기화돼 순차적으로 통지됩니다. 그리고 모든 Observable이 완료될 때 완료를 통지합니다.

또한 merge 메소드는 error 통지시 즉시 에러를 통지하지만, mergeDelayError 메소드는 에러 통지를 받아도 다른 Observable의 처리가 완료될 때까지 에러 통지를 기다립니다. 후에 다른 Observable을 처리한 뒤 Error를 통지합니다.
merge() 메소드는 인자를 최대 네개까지 전달할 수 있으나, mergeArray() 메소드는 인자를 배열로 받기에 네 개 이상의 Observable을 전달할 수도 있습니다. 마찬가지로 mergeDelayError/mergeArrayDelayError() 메소드도 인자로 전달하는 Observable의 수만 다릅니다.

아래 예제에서는 interval() 메소드로 생성한 2개의 Observable을 merge() 메소드를 사용해 하나로 묶습니다. 그리고 인자로 받은 Observable의 처리를 동시에 실행합니다. 이때 원본 Observable들은 통지 간격도 완료 통지 시점도 각기 다릅니다.

val observable1 = Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(5)

val observable2 = Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(2)
        .map {
            it + 100L
        }

val result = Observable.merge(observable1, observable2)
        .subscribe(DebugObserver())

Thread.sleep(2000L)

=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-2 : 100
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-2 : 101
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
RxComputationThreadPool-1 : 완료

실행 결과를 보면, 각 Observable이 동시에 시작됩니다. 또한 2번째 Observable이 처리를 완료해도 첫번 째 Observable은 데이터를 통지하므로, 모든 Observable이 완료를 통지할 때까지 데이터를 계속 통지합니다.

concat/concatDelayError/concatArray/concatArrayDelayError/concatWith

여러개의 Observable을 "하나씩" 실행

concat/concatDelayError/concatArray/concatArrayDelayError/concatWith() 메소드는 여러개의 Observable을 전달받아 하나의 Observable로 결합한 후 순차적으로 실행하는 연산자입니다. 결과 Observable이 처리를 시작하면 첫 번째 Observable이 실행되고 해당 통지가 완료되면 다음 Observable이 시작됩니다. 이처럼 Observable이 하나씩 실행되므로 완료되지 않은 Observable이 포함되면 다음 Observable이 실행되지 않습니다.

한편 concatWith() 메소드는 concat() 메소드의 인스턴스 메소드처럼 자신의 통지를 한 뒤에 인자로 받은 Observable의 통지를 합니다.

merge와 같이 에러 발생시에, 그 즉시 error를 통지하며 concatDelayError또한 모든 데이터 통지 후 error를 통지합니다.

또, 최대 인자 개수는 4개이며 그 이상이 필요하다면 Array로 전달해줍니다.

val observable1 = Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(5)

val observable2 = Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(2)
        .map {
            it + 100L
        }

val result = Observable.concat(observable1, observable2)
        .subscribe(DebugObserver())

Thread.sleep(3000L)

=>RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-1 : 4
RxComputationThreadPool-2 : 100
RxComputationThreadPool-2 : 101
RxComputationThreadPool-2 : 완료

concat() 메소드 인자 순서대로 실행합니다. 첫 인자 observable1이 실행 완료되면 observable2가 실행된 후 완료를 통지합니다.

concatEager/concatArrayEager

여러 개의 Observable을 결합해 동시 실행하고 한 건씩 통지

concatEager/concatArrayEager() 메소드는 여러 개의 Observable을 전달받아 하나로 결합한 Observable이 한꺼번에 실행되지만, 통지는 순서대로해주는 메소드입니다. 즉, 동시에 실행은 되지만 데이터는 통지하지 않습니다. 또한, 이전 Observable이 완료되는 시점에 바로 전까지 캐시에 쌓인 데이터를 통지합니다. 이처럼 통지가 순차적으로 이루어지므로 그 안에서 완료되지 않은 Observable이 포함되면 다음 Observable이 통지하는 데이터는 통지되지 않습니다.

그리고 concat() 메소드는 인자 Observable을 Iterable로만 전달해야 하지만, concatArray() 메소드는 인자 배열을 받으므로 여러 개의 Observable을 쉼표로 구분해 전달할 수 있습니다.

val observable1 = Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(5)

val observable2 = Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(5)
        .map {
            it + 100L
        }
val sources = arrayListOf(observable1, observable2)
val result = Observable.concatEager(sources)
result.subscribe(DebugObserver())
    
Thread.sleep(3000L)

=> RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
RxComputationThreadPool-2 : 4
RxComputationThreadPool-1 : 100
RxComputationThreadPool-1 : 101
RxComputationThreadPool-1 : 102
RxComputationThreadPool-2 : 103
RxComputationThreadPool-2 : 104
RxComputationThreadPool-2 : 완료

이를 표로 보면 이해하기 쉽습니다!

경과 시간0300500600900100012001500...20002500
observable101234완료
observable2100101102103104
통지 데이터01234100,101,102103104

startWith,startWithArray

인자의 데이터를 통지한 후 자신의 데이터 통지

startWith,startWithArray() 메소드는 인자의 데이터를 통지하고 나서, 자신의 데이터를 통지하는 연산자입니다. 메소드로 생성한 Observable은 인자의 모든 데이터를 통지한 후에 자신의 모든 데이터를 통지합니다.

그리고 startWith() 메소드는 데이터가 있는 Observable이나 Iterable 또는 한 건의 데이터를 인자로 받지만, startWithArray() 메소드는 인자로 통지할 데이터의 배열을 받아 여러 건의 통지 데이터를 직접 인자에 전달할 수 있습니다.

val observable1 = Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(5)

val observable2 = Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(2)
        .map {
            it + 100L
        }

val result = observable1.startWith(observable2)
        .subscribe(DebugObserver())
    
Thread.sleep(3000L)
    
=> RxComputationThreadPool-1 : 100
RxComputationThreadPool-1 : 101
RxComputationThreadPool-2 : 0
RxComputationThreadPool-2 : 1
RxComputationThreadPool-2 : 2
RxComputationThreadPool-2 : 3
RxComputationThreadPool-2 : 4
RxComputationThreadPool-2 : 완료

결과를 보면 인자의 Observable을 먼저 출력한 뒤, startWith() 메소드를 호출한 원본 Observable의 데이터는 그 이후에 통지합니다.

zip/zipWith

여러 Observable의 데이터를 모아 새로운 데이터를 생성 통지

zip() 메소드는 인자로 전달된 여러개의 Observable에서 데이터를 받아 데이터들이 모인 시점에 이 데이터들을 함수형 인터페이스에 전달하고, 이 함수형 인터페이스에서 새로 생성한 데이터를 결과로 통지하는 연산자입니다. 새로운 데이터는 모든 Observable에서 동일한 순서의 데이터를 받아 생성합니다. 따라서 인자로 전달한 Observable의 통지 시점이 다르면 가장 느리게 처리한 Observable이 데이터를 통지한 시점에 새로운 데이터가 생성됩니다. 완료 통지 시점은 통지하는 데이터 개수가 가장 적은 인자 Observable에 맞춥니다.

zipWith() 메소드는 zip() 메소드의 인스턴스 메소드와 마찬가지로 자신이 통지하는 데이터와 인자 Observable이 통지하는 데이터로부터 새로운 데이터를 생성해 이를 통지합니다.

아래 예제에서는 interval() 메소드로 2개의 Observable을 생성하고, 생성한 두 Observable을 인자로 하는 zip() 메소드로 각 Observable에서 통지 순서가 동일한 데이터를 조합해 시로운 데이터를 생성, 통지합니다!

val observable1 = Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(5)

val observable2 = Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(3)
        .map {
            it + 100L
        }
val result = Observable.zip(observable1, observable2, { observable1, observable2 ->
        arrayListOf(observable1, observable2)
    }).subscribe(DebugObserver())

Thread.sleep(2000L)

=> RxComputationThreadPool-2 : [0, 100]
RxComputationThreadPool-2 : [1, 101]
RxComputationThreadPool-2 : [2, 102]
RxComputationThreadPool-2 : 완료

결과를 보면 인자의 모든 Observable이 데이터를 통지한 시점에 새로운 데이터를 생성하고 이 데이터를 통지합니다. 또한, 완료는 데이터 개수가 적은 Observable이 완료된 시점에 통지하고, 데이터 개수가 많은 Observable의 초과 데이터는 통지하지 않음을 알 수 있습니다! (3,4는 통지되지 않음!!)

combineLatest/combineLatestDelayError

여러 Observable에서 데이터를 받을 때마다 새로운 데이터를 생성 통지

combineLatest/combineLatestDelayError() 메소드는 인자로 받은 여러 Observable이 데이터를 받는 시점에 각 Observable이 마지막으로 통지한 데이터를 함수형 인터페이스에 전달하고, 이 데이터를 받아 새로 데이터를 생성해 통지하는 연산자입니다. 처음에는 인자 Observable이 통지할 데이터가 갖추어질 때까지 기다리지만, 이후에는 각 원본 Observable이 통지할 때마다 새로운 데이터를 생성합니다.

말로는 어려우니, 실행 결과와 함께 예제를 보겠습니다!

val observable1 = Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(5)

val observable2 = Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(3)
        .map {
            it + 100L
        }

val result = Observable.combineLatest(observable1, observable2) { observable1, observable2 ->
        arrayListOf(observable1, observable2)
    }.subscribe(DebugObserver())

Thread.sleep(2000L)
    
=> RxComputationThreadPool-2 : [0, 100]
RxComputationThreadPool-1 : [1, 100]
RxComputationThreadPool-1 : [2, 100]
RxComputationThreadPool-2 : [2, 101]
RxComputationThreadPool-1 : [3, 101]
RxComputationThreadPool-1 : [4, 101]
RxComputationThreadPool-1 : [4, 102]
RxComputationThreadPool-1 : 완료

결과를 보면 알겠지만, zip과는 다르게 생성되는 데이터마다 새로운 데이터를 만들어 냅니다!

profile
뜻을 알고 코딩하기

0개의 댓글