4. Flowable과 Observable의 연산자(1)

안석주·2021년 12월 2일
0

RxJava

목록 보기
6/10

서론

이번 포스트에서는 RxJava의 모든 것, 연산자에 대해서 알아보겠습니다! 책의 분략으로 대략 150쪽이 되어 2~4개의 포스트로 나누어서 포스팅해보겠습니다. 오래 걸리겠지만... 빠르게 4장을 끝내보도록 하겠습니다.

RxJava는 데이터 소스 역할을 하는 Flowable/observable을 생성하는 메소드와 데이터를 변환하거나 선별하는 작업을 거쳐 새로운 Flowable/observable을 생성하는 메소드를 제공합니다. 이처럼 메소드가 Flowable/observable을 반환하는 특성을 이용해 순서대로 메소드를 연결해가면(메소드 체인) 최종으로 소비자가 처리하기 쉽게 최적화된 형태로 데이터를 통지하는 Flowable/observable을 생성할 수 있습니다! 또한, 통지하는 데이터 개수에 따라 Single/Maybe/Completable을 생성할 수 도 있습니다! (안드로이드에서 통신을 Single/Completable로 자주합니다!)

RxJava에서는 이처럼 데이터를 통지하는 Flowable이나 Observable을 생성하는 메소드를 연산자라고 부릅니다.

이번장에서는 Flowable/observable의 연산자중에서도 특히 자주 사용되거나, 중요한 연산자를 이야기해봅니다.

이제 아래 설명한 순서대로 연산자를 살펴보겠습니다.
1. Flowable/observable을 생성하는 연산자
2. 통지 데이터를 변환하는 연산자
3. 통지 데이터를 제한하는 연산자
4. Flowable/observable을 결합하는 연산자
5. Flowable/observable 상태를 통지하는 연산자
6. Flowable/observable 데이터를 집계하는 연산자
7. 유틸리티 연산자

저희 이번 포스트에서는 1,2번까지 알아보겠습니다... (빠른 시일내로 끝내보겠습니다 ㅠㅠ)

또한 책에서는 모든 구현을 Flowable로 구현했지만, 저는 Observable로 구현하겠습니다. (추가로 코틀린으로 구현합니다ㅎㅎ!)

또한 쉽게 결과를 보기 위해 DebugObserver를 미리 만들어보겠습니다. DebugObserver는 받은 통지 데이터를 처리하는 쓰레드 이름과 함께 데이터나 통지 내용을 출력합니다!
(4.1.7 예제때문에, (data: T? =null)을 대입해주었습니다!

class DebugObserver<T>(data: T? = null) : DisposableObserver<T>() {
    override fun onNext(t: T) {
        println("${Thread.currentThread().name} : $t")
    }

    override fun onError(e: Throwable) {
        println("${Thread.currentThread().name} : ${e.printStackTrace()}")
    }

    override fun onComplete() {
        println("${Thread.currentThread().name} : 완료")
    }
}

4.1 Flowable/Observable을 생성하는 연산자

이하 Flowable/Observable은 Observable로 통일하겠습니다.(어차피 거의 다 같이 생성 가능해요!) 예를 들면 just에서 인자의 데이터를 통지하는 Flowable/Observable 생성 이지만, 아래와 같이 씁니다!

4.1.1 just

인자의 데이터를 통지하는 Observable 생성

just 메소드는 인자로 받은 데이터를 통지하는 Observable을 생성하는 연산자로, 인자는 최대 10개까지 지정할 수 있으며, 왼쪽에 설정한 인자부터 순서대로 통지합니다! 모든 데이터를 통지하면 완료(onComplete)를 통지합니다.

아래 예제는 'A,B,C,D,E'를 통지하는 just 메소드를 이용한, Observable을 생성하고, 모든 데이터 통지한 후 완료(onComplete)를 통지합니다.

Observable.just("A", "B", "C", "D", "E")
        .subscribe(DebugObserver<String>())
=>main : A
main : B
main : C
main : D
main : E
main : 완료

결과를 보면 인자로 받은 배열에 담긴 객체를 순서대로 통지하고 모든 데이터 통지를 마친 뒤 완료를 통지합니다.

4.1.2 fromArray/fromIterable

배열이나 Iterable에서 Observable 생성

fromArray 메소드는 인자로 지정한 배열을, fromIterable 메소드는 인자로 지정한 Iterable(리스트 등)에 담긴 객체를 순서대로 통지하는 Observable을 생성하는 연산자입니다. 생성한 Observable은 모든 데이터를 통지하면 완료를 통지합니다.

Observable.fromArray("A", "B", "C", "D", "E")
        .subscribe(DebugObserver<String>())

    val list = listOf("a","b","c","d","e")
    Observable.fromIterable(list)
        .subscribe(DebugObserver<String>())
=> main : A
main : B
main : C
main : D
main : E
main : 완료
main : a
main : b
main : c
main : d
main : e
main : 완료

4.1.3 fromCallable

Callable의 처리를 실행하고, 그 결과를 통지하는 Observable 생성

fromCallable() 메소드는 인자로 지정한 java.util.concurrent.Callable 함수형 인터페이스에서 생성한 데이터를 통지하는 Observable을 생성하는 연산자입니다. 생성한 Observable은 인자로 지정한 Callable의 반환값을 데이터로 통지하며, 이를 끝마치면 완료를 통지합니다. 그러므로 동일한 Observable의 인스턴스라도 통지하는 데이터는 호출될 때마다 새로 생성됩니다.

Observable.fromCallable { System.currentTimeMillis() }
        .subscribe(DebugObserver<Long>())
=> main : 1638463306811
main : 완료

4.1.4 range/rangeLong

지정한 숫자부터 지정한 개수만큼 통지하는 Flowable 생성

range() 메소드는 지정한 숫자부터 지정한 개수만큼 하나씩 증가하는 Integer 값 데이터를 통지하는 Observable을 생성하는 연산자이며, rangeLong 메소드는 같은 방식으로 Long 데이터를 통지하는 Observable을 생성하는 연산자입니다! 예를 들어 시작 값이 1이고, 통지하려는 데이터 개수가 3이라면 '1,2,3'이 통지됩니다. 후에 onComplete를 통지합니다.
아래 예제는 10부터 3개를 출력하는 예제입니다.

Observable.range(10, 3)
        .subscribe(DebugObserver<Int>())
->main : 10
main : 11
main : 12
main : 완료

4.1.5 interval

지정한 간격마다 숫자를 통지하는 Observable 생성

interval() 메소드는 지정한 통지 간격(interval)마다 0부터 시작하는 Long 타입의 숫자 데이터를 통지하는 Observable을 생성하는 연산자입니다. 즉, 데이터는 '0,1,2...'의 순서로 통지됩니다. interval 메소드로 생성한 Observable은 별도 설정이 없으면 원래 호출한 쓰레드가 아닌 Schedulers.computation()의 스케줄러에서 실행됩니다. 스케줄러를 변경하고프면 인자로 스케줄러를 전달해줍니다.
또한 최초의 통지 데이터인 0을 통지하는 시점은 처리 시작 시점이 아닌, 지정 시점이 지난 후입니다. 최초 데이터를 통지할 대 대기 시간을 인자로 지정하는 메소드가 있는데, 이 메소드를 이용해 최초 0을 통지하는 시점만 변경할 수 있습니다.
interval() 메소드로 샐성한 Observable은 완료될 수 없습니다. 그러므로 완료 통지가 필요하다면 take() 메소드 등으로 통지할 데이터 개수를 제한하는 작업을 해야합니다.
추가로, 약간의 데이터 오타는 CPU 부하등의 영향으로 어느정도 오차가 발생할 수 있으나, 자바에선 흔하니 알아두시면 좋습니다!
아래 예제는 1000밀리초(1초)마다 0부터 시작하는 숫자를 통지하는 Observable을 생성하고, 시작 시간과 통지되는 시간을 알아봅니다!

val formatter = DateTimeFormatter.ofPattern("mm:ss.SSS")

val observable = Observable.interval(1000L, TimeUnit.MILLISECONDS)
println("시작시간 : ${LocalTime.now().format(formatter)}")

observable
        .subscribe {
        val time = LocalTime.now().format(formatter)
        println("${Thread.currentThread()} : $time: data : $it")
        } 

Thread.sleep(5000L)
    
시작시간 : 39:55.722
Thread[RxComputationThreadPool-1,5,main] : 39:56.739: data : 0
Thread[RxComputationThreadPool-1,5,main] : 39:57.737: data : 1
Thread[RxComputationThreadPool-1,5,main] : 39:58.737: data : 2
Thread[RxComputationThreadPool-1,5,main] : 39:59.750: data : 3
Thread[RxComputationThreadPool-1,5,main] : 40:00.751: data : 4

결과를 보면 1초마다 데이터를 통지하며, 처음 설정 그대로라면 0을 통지할 때까지 1초를 기다립니다. 이예제는 완료 통지하지 않으므로 Thread.sleep()의 시간을 늘린다면 더 많이 출력할 수 있습니다.

4.1.6 timer

지정 시간 경과 후 '0'을 통지하는 Observable

timer 메소드는 호출 시점부터 지정 시간 이후 Long타입의 숫자 0 하나만 통지하고 종료하는 Observable을 생성하는 연산자입니다. timer 메소드로 생성한 Observable은 기본적으로 computation() 스케줄러에서 실행됩니다. 호출한 쓰레드와는 별도의 쓰레드에서 실행되므로 스케줄러를 변경하려면 인자로 스케줄러를 받아 다른 쓰레드에서 실행할 수 있습니다.
아래 예제에서는 1초뒤 0을 통지하는 Observable을 생성합니다.

val timeFormatter = DateTimeFormatter.ofPattern("mm:ss.SSS")

println("시작시간 = ${LocalTime.now().format(timeFormatter)}")
Observable.timer(1000L, TimeUnit.MILLISECONDS)
        .subscribe({ data ->
            println("${Thread.currentThread()} :
            ${LocalTime.now().format(timeFormatter)} = $data")
        }, {
            println("에러 = ${it.printStackTrace()}")
        }) {
            println("완료")
        }
Thread.sleep(1500L)
    
-> 시작시간 = 03:02.169
Thread[RxComputationThreadPool-1,5,main] : 03:03.268 = 0
완료

결과를 보니 1초 뒤 데이터를 통지하고 완료합니다.

4.1.7 defer

구독될 때마다 새로운 Observable 생성

defer() 메소드는 구독이 발생할때 마다 함수형 인터페이스로 정의한 새로운 Observable을 생성하는 연산자입니다. 생성한 Observable이 통지하는 데이터는 함수형 인터페이스로 생성한 Observable의 데이터입니다. 또한 just() 메소드와는 다르게 defer() 메소드는 선언한 시점의 데이터를 통지하는 것이 아니라, 호출 시점에 데이터 생성이 필요할 때 사용합니다.

val observable = Observable.defer {
        Observable.just(LocalTime.now())
    }

    observable.subscribe(DebugObserver("No.1"))

    Thread.sleep(2000L)

    observable.subscribe(DebugObserver("No.2"))
=> main : 03:12:09.194
main : 완료
main : 03:12:11.202
main : 완료

실행 결과를 보면 defer() 메소드의 함수형 인터페이스에서 생성한 Observable의 내용이 Observer에 통지됩니다. 또한, 시간 간격을 두고 동일한 Observable을 두번 구독했을 때, 데이터로 받은 현재 시각이 다르므로 Observable이 매번 새로 생성된다는 것을 알 수 있습니다.
이때 defer() 메소드가 아닌 just() 메소드로 생성했다면, 시간은 같을 것입니다!!

4.1.8 empty

빈 Observable 생성

empty() 메소드는 빈 Observable을 생성하는 연산자로, 처리를 시작하면 바로 완료를 통지합니다. 이 메소드는 단독으로는 잘 안쓰고, flatMap 메소드의 통지 데이터가 null일 때 이를 대신해 empty 메소드에 생성한 Observable로 이후 이 데이터를 통지 대상에서 제외하는 등의 작업을 할 수 있습니다.

Observable
        .empty<Int>()
        .subscribe(DebugObserver())
=> main : 완료

실행 결과 완료만 통지합니다.

4.1.9 error

에러만 통지하는 Observable 생성

말그대로 error만 통지하는 Observable을 생성합니다! 처리를 시작하면 에러 객체와 함께 에러를 통지합니다. 이 메소드 단독으로 사용 잘안하고, flatMap 처리중에 에러를 통지하고 싶을 때 error() 메소드로 생성한 Observable을 반환해 명시적으로 에러를 통지합니다.

Observable
        .error<Int> {
            Exception("예외")
        }.subscribe(DebugObserver())
=> java.lang.Exception: 예외
	at RxJava4Example.ErrorExampleKt.main$lambda-0(ErrorExample.kt:12)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableError.subscribeActual(ObservableError.java:32)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13099)
	at RxJava4Example.ErrorExampleKt.main(ErrorExample.kt:13)
	at RxJava4Example.ErrorExampleKt.main(ErrorExample.kt)
main : kotlin.Unit

실행 결과 에러와 함께 에러 객체를 통지합니다.

4.1.10 never

아무것도 통지하지 않는 Observable 생성

never 메소드는 아무것도 통지하지 않는 Observable을 생성하는 연산자로, 완료도 통지하지 않습니다. 완료를 통지하는 empty() 메소드와 헷갈리면 안됩니다!!

Observable
        .never<Int>()
        .subscribe(DebugObserver<Int>())
=> 
Process finished with exit code 0

지금까지는 생성자에 대해서 알아봤습니다!!! 이번부터는 통지된 데이터를 변환해주는 연산자를 알아보겠습니다!

4.2 통지 데이터를 변환하는 연산자

4.2.1 map

데이터를 변환 통지

map() 메소드는 원본 Observable에서 통지하는 데이터를 변환한 뒤 변환된 데이터를 통지하는 연산자입니다. 다만 뒤에 나오는 flatMap() 메소드와는 달리 한 개의 데이터로 여러 데이터를 생성해 통지하거나 데이터 통지를 건너뛸 수는 없습니다. 인자로 받는 함수형 인터페이스는 데이터를 받으면 반드시 무엇이든 null이 아닌 데이터를 하나 반환해야 합니다.
mapper는 원본 데이터를 어떻게 변환할지를 정의하는 함수형 인터페이스입니다. 변환한 데이터의 자료형은 받은 데이터의 자료형과 달라도 상관없습니다. 단 null을 반환시 NPE가 발생하니 null은 안됩니다! 반환 결과가 null이 될 수 있다면, map대신 flatMap() 메소드를 사용하는 편이 낫습니다.

Observable.just("A", "B", "C", "D", "E")
        .map { originData ->
            originData.lowercase(Locale.getDefault())
        }.subscribe(DebugObserver())
=> main : a
main : b
main : c
main : d
main : e
main : 완료

이 예제는 원본 데이터를 소문자로 변환해 통지합니다. 위의 예제에서는 String을 다시 반환하지만, 꼭 String일 필요는 없습니다!

4.2.2 flatMap

받은 데이터를 Observable로 변환하고 이 Observable의 데이터를 통지

flatMap() 메소드는 map() 메소드와 마찬가지로 원본 데이터를 변환해 통지하는 연산자입니다. 그러나 map() 메소드와는 다르게 여러 데이터가 담긴 Observable을 반환하므로 데이터 한 개로 여러 데이터를 통지할 수 있습니다. 또한, 빈 Observable을 반환해 특정 데이터를 통지하지 않거나, 에러 Observable을 반환해 에러를 통지할 수도 있습니다.
flatMap() 메소드의 함수형 인터페이스가 Observable을 반환한다는 것은 메소드 체인에서 중요한 부분입니다! 예를 들어, 변환 결과가 null이거나 조건에 맞지 않는 데이터라면 변환 결과로 빈 Observable을 반환해 후속 처리 시에 해당 데이터를 건너뛰게 할 수 있습니다. 또한 받은 데이터에 따라 에러를 통지해야 할 때 에러 Observable을 반환해 에러를 통지할 수 있습니다.

여기서 flatMap() 메소드는 여러 인자가 들어갈 수 있으니, 하나하나 알아보겠습니다.

flatMap(mapper)

인자타입인자이름설명
Functionmapper받은 데이터로 새로운 Observable을 생성하는 방법을 정의하는 함수형 인터페이스

아래 예제에서는 flatMap() 메소드로 빈 문자가 포함된, 원본 대문자 데이터를 소문자 데이터로 변환해 통지합니다. 이 때 빈 문자는 제외하고 통지합니다.

Observable.just("A", "", "B", "", "C", "")
        .flatMap { data ->
            if ("" == data) {
                Observable.empty()
            } else {
                Observable.just(data.toLowerCase())
            }
        }.subscribe(DebugObserver<String>())
=> main : a
main : b
main : c
main : 완료 

실행 결과를 보면 대문자 데이터는 소문자 데이터로 변환해 통지하고, 빈 문자 데이터는 통지 대상에서 제외합니다. 이 예제에서는 받은 데이터와 같은 데이터형을 통지하지만, 다른 데이터형을 반환할 수도 있습니다. 예를 들어, 상속 관계가 있는 객체가 있을 때, 부모 객체를 데이터로 받아 이를 자식 객체로 변환해 데이터를 통지할 수도 있습니다!

flatMap(mapper, combiner)

combiner는 원본 Observable이 통지한 데이터와 첫 번째 인자인 mapper에서 생성한 Observable의 데이터를 받아 새로운 통지 데이터를 생성하는 함수형 인터페이스입니다. 이 combiner의 반환값이 최종으로 통지하는 데이터입니다.

인자타입인자이름설명
Functionmapper받은 데이터로 새로운 Observable을 생성하는 방법을 정의하는 함수형 인터페이스
BiFunctioncombinermapper가 새로 생성한 Observable 데이터와 원본 데이터를 조합해 새로운 통지 데이터를 생성하는 함수형 인터페이스

아래 예제는 range() 메소드로 생성한 Observable이 통지하는 원본 데이터와 flatMap() 메소드 내부에서 interval() 메소드로 생성한 Observable이 통지하는 데이터를 조합해 문자열을 만들고, 이를 통지하는 Observable을 생성합니다. 예제에서 mapper에 지정한 함수형 인터페이스는 단순히 interval() 메소드로 Observable을 생성합니다. 그리고 combiner에 지정한 함수형 인터페이스는 원본 Observable이 통지하는 데이터(sourceData)와 mapper가 반환하는 Observable이 통지하는 데이터(newData)를 조합해 생성한 새로운 데이터를 통지합니다.

Observable.range(1, 3)
        .flatMap({ // 첫 번째 인자 : 데이터를 받으면 새로운 Observable을 생성한다 
            Observable.interval(100L, TimeUnit.MILLISECONDS)
                // 3건까지 통지한다
                .take(3)
        }, { sourceData, newData -> 
            // 두 번째 인자 : 원본 데이터와 변환 데이터로 새 통지 데이터 생성
            "[$sourceData] $newData"
        }).subscribe(DebugObserver())

Thread.sleep(1000L)

=> RxComputationThreadPool-2 : [2] 0
RxComputationThreadPool-3 : [1] 0
RxComputationThreadPool-3 : [3] 0
RxComputationThreadPool-1 : [1] 1
RxComputationThreadPool-1 : [2] 1
RxComputationThreadPool-1 : [3] 1
RxComputationThreadPool-2 : [2] 2
RxComputationThreadPool-2 : [1] 2
RxComputationThreadPool-2 : [3] 2
RxComputationThreadPool-3 : 완료
  • 비동기 처리이기 때문에 순서가 다를 수 있습니다!

실행 결과를 보면 원본 Observable과 mapper가 반환하는 Observable의 데이터를 각각 받아 이 데이터로 생성한 새로운 데이터가 통지되는 것을 알 수 있습니다. 그리고 mapper가 생성하는 Observable은 서로 다른 쓰레드에서 실행돼 비동기로 데이터가 통지되어 순서대로 통지되진 않습니다.

flatMap(onNextMapper, onErrorMapper, onCompleteSupplier)

인자타입인자이름설명
FunctiononNextMapper받은 데이터로 새로운 Observable을 생성하는 방법을 정의하는 함수형 인터페이스
FunctiononErrorMapper에러가 통지됐을 때 무엇을 통지할지 정의하는 함수형 인터페이스
CallableonCompleteSupplier완료가 통지됐을 때 무엇을 통지할지 정의하는 함수형 인터페이스
  • onNextMapper는 데이터를 받았을 때 이를 변환하는 함수형 인터페이스로, flatMap의 mapper와 같은 방법으로 데이터를 처리합니다.
  • onErrorMapper는 에러가 통지되면 에러 에러 대신 정상적인 데이터로 변환해 통지하게 하는 함수형 인터페이스입니다. 에러 통지 대신 데이터로 변환하므로 수신 측은 에러로 종료되지 않고 정상적으로 종료할 수 있습니다!
  • onCompleteSupplier는 완료가 통지되면 완료 시점에 통지할 데이터를 생성하는 함수형 인터페이스입니다. 완료 통지를 받았을 때 함수형 인터페이스의 반환값을 통지한 후 완료를 통지합니다.

아래 예제는 숫자를 통지하는 Observable을 생성한 Observable이 통지하는 내용에 따라 생성하고, flatMap(onNextMapper, onErrorMapper, onCompleteSupplier)로 다른 결과를 통지합니다. Observable이 일반적 데이터를 통지시 그대로 통지, 에러시 -1, 문제 없이 완료시 100을 통지합니다. 아래 예제에서는 0을 나누어 에러를 발생하게 했습니다.

Observable.just(1, 2, 0, 4, 5)
        .map { 10 / it }
        .flatMap({
                Observable.just(it) // 일반 데이터 통지
            }, {
                Observable.just(-1) // 에러 발생시 데이터 통지
            }, {
                Observable.just(100) // 완료시 데이터 100 통지
            })
        .subscribe(DebugObserver())
=> main : 10
main : 5
main : -1
main : 완료

실행 결과를 보면 정상적인 데이터는 그대로 통지, 에러 통지시 -1, 완료시 100을 통지합니다. 그런데 100이 통지 되지 않은 이유는 무엇일까요? 원본 Observable이 0에서 에러를 통지해 완료를 통지하지 못했기 때문에 onCompleteSupplier의 함수형 인터페이스는 실행되지 않습니다!

완료 됐다면 100이 출력됐겠죠???????

4.2.3 concatMap/ concatMapDelayError

받은 데이터를 Observable로 변환하고, 변환한 Observable을 하나씩 순서대로 실행해 이 데이터를 통지

concatMap/ concatMapDelayError() 메소드는 원본 데이터를 Observable로 변환해, 이 변환한 Observable의 데이터를 통지하는 연산자입니다. flatMap() 메소드와 마찬가지로 인자로 지정한 함수형 인터페이스는 데이터가 여럿 있는 Observable을 반환하면 데이터 한개로 여러 데이터를 생성해 통지할 수 있습니다. 그러나 flatMap과는 다르게 데이터를 받은 순서대로 Observable을 생성하고, 이를 하나씩 실행합니다. 즉, 여러 데이터를 계속해서 받더라도 첫 번째 데이터로 생성한 Observable의 처리가 끝나야 다음 데이터로 생성한 Observable을 실행합니다. 따라서 함수형 인터페이스가 반환한 Observable의 실행이 완료되지 않으면 다음 데이터로 만든 Observable의 데이터는 통지되지 않습니다. 또한 함수형 인터페이스의 반환값인 Observable이 이를 호출하는 쓰레드와 별도의 쓰레드에서 진행되더라도, 데이터를 받을 때 바로 실행되지 않으므로 데이터를 받은 순서대로 처리하는 것은 보증하지만, 처리 성능에 영향을 줄 수 있으므로, 주의해야 합니다.

또한, concatMap() 메소드는 함수형 인터페이스가 반환하는 Observable에서 에러가 발생한 시점에 에러를 통지하지만, concatMapDelayError() 메소드는 생성한 Observable에서 에러가 발생해도 다른 데이터로 생성한 Observable의 처리가 완료될 때까지 에러 통지를 미룹니다.

Observable.range(10, 3)
        .concatMap { sourceData ->
            Observable.interval(500L, TimeUnit.MILLISECONDS)
                .take(2)
                .map { data ->
                    "${System.currentTimeMillis()}ms: [$sourceData] $data"
                }
        }.subscribe(DebugObserver())
Thread.sleep(4000L)

=> RxComputationThreadPool-1 : 1638479820011ms: [10] 0
RxComputationThreadPool-1 : 1638479820512ms: [10] 1
RxComputationThreadPool-2 : 1638479821014ms: [11] 0
RxComputationThreadPool-2 : 1638479821515ms: [11] 1
RxComputationThreadPool-3 : 1638479822031ms: [12] 0
RxComputationThreadPool-3 : 1638479822520ms: [12] 1
RxComputationThreadPool-3 : 완료

4.2.4 concatMapEager/ concatMapEagerDelayError

받은 데이터를 Observable로 변환하자마자 실행, 생성된 Observable 순서대로 통지

concatMapEager은 앞의 두 flatMap과 concatMap을 합친 것과 비슷합니다. 다른 쓰레드에서 즉시 실행되지만, 결과는 버퍼에서 대기하다가 순서대로 통지하게 됩니다. 그러므로 함수형 인터페이스가 반환하는 Observable이 완료되지 않으면 다음 데이터로 생성한 Observable의 데이터는 통지되지 않습니다. concatMapEagerDelayError는 에러가 발생해도 데이터를 모두 통지할 때까지 에러 통지를 미룹니다.

Observable.range(10, 3)
        .concatMapEager { sourceData ->
            Observable.interval(500L, TimeUnit.MILLISECONDS)
                .take(2)
                .map { data ->
                    "${System.currentTimeMillis()}ms : [$sourceData] $data"
                }
        }.subscribe(DebugObserver())
    Thread.sleep(4000L)
=>RxComputationThreadPool-1 : 1638480542455ms : [10] 0
RxComputationThreadPool-2 : 1638480542960ms : [10] 1
RxComputationThreadPool-2 : 1638480542455ms : [11] 0
RxComputationThreadPool-2 : 1638480542960ms : [11] 1
RxComputationThreadPool-2 : 1638480542455ms : [12] 0
RxComputationThreadPool-2 : 1638480542960ms : [12] 1
RxComputationThreadPool-2 : 완료

결과를 보면 생성된 시간은 같지만, 출력은 순서대로 됐습니다.
이번에는 에러가 발생해도, 통지가 끝난 뒤 에러가 발생하도록 합니다. concatMapEagerDelayError를 통해서요!

Observable.range(10, 3)
        .concatMapEagerDelayError({ sourceData ->
            Observable.interval(500L, TimeUnit.MILLISECONDS)
                .take(3)
                .doOnNext {
                    if (sourceData == 11 && it.toInt() == 1) {
                        throw Exception("예외 발생")
                    }
                }
                .map { data ->
                    "${System.currentTimeMillis()}ms : [$sourceData] $data"
                }
        }, true).subscribe(DebugObserver())
Thread.sleep(4000L)
=>RxComputationThreadPool-2 : 1638480884855ms : [10] 0
RxComputationThreadPool-1 : 1638480885343ms : [10] 1
RxComputationThreadPool-3 : 1638480885845ms : [10] 2
RxComputationThreadPool-3 : 1638480884855ms : [11] 0
RxComputationThreadPool-3 : 1638480884855ms : [12] 0
RxComputationThreadPool-3 : 1638480885343ms : [12] 1
RxComputationThreadPool-3 : 1638480885845ms : [12] 2
RxComputationThreadPool-3 : kotlin.Unit
java.lang.Exception: 예외 발생

결과를 보면 에러가 발생해도, 처리를 계속하고 모든 처리가 끝난 뒤 에러를 통지합니다.
두 번째 인자가 false라면, 에러 발생시 즉시 에러를 통지합니다.

4.2.5 buffer

통지할 데이터를 지정한 범위까지 모아 리스트나 컬렉션으로 통지

buffer() 메소드는 통지하려는 데이터를 매번 통지하는 것이 아니라 어느 정도 모아서 리스트나 컬렉션에 담아 통지하는 연산자 입니다. 데이터를 모으는 단위로는 데이터 개수나 시간 간격을 지정할 수도 있습니다.

buffer(count), buffer(time, unit)

아래 예제는 take 메소드로 10건까지만 통지합니다. 그리고 리스트에는 3건씩 데이터를 담습니다. 마지막에 남은 데이터는 리스트에 담아 통지합니다.

Observable.interval(100L,TimeUnit.MILLISECONDS)
        .take(10)
        .buffer(3)
        .subscribe(DebugObserver())
Thread.sleep(3000L)
    
=> RxComputationThreadPool-1 : [0, 1, 2]
RxComputationThreadPool-1 : [3, 4, 5]
RxComputationThreadPool-1 : [6, 7, 8]
RxComputationThreadPool-1 : [9]
RxComputationThreadPool-1 : 완료

buffer(count, skip)

count개씩 버퍼에 모으고, skip번째 데이터는 건너 뛰어서 list에 저장하여 리턴해줍니다.

Observable.interval(300L, TimeUnit.MILLISECONDS)
        .take(7)
        .buffer(2,3)
        .subscribe(DebugObserver())
Thread.sleep(4000L)
    
=> RxComputationThreadPool-1 : [0, 1]
RxComputationThreadPool-1 : [3, 4]
RxComputationThreadPool-1 : [6]
RxComputationThreadPool-1 : 완료

이렇게 쓰면 데이터를 skip할 순 있지만, 그냥 skip() 메소드를 사용하는 것이 코드를 파악함에 있어 더 수월하지 않을까 생각됩니다...(개인적인 생각)

4.2.6 toList

통지할 데이터를 모두 리스트에 담아 통지

toList() 메소드는 통지되는 데이터를 모두 리스트에 담아 통지하는 연산자로, 원본 Observable에서 완료 통지를 받은 시점에 결과 리스트를 통지합니다. 그러므로 완료를 통지하지 않는 Observable에서는 사용할 수 없습니다. 또한 데이터를 통지하는 Observable에서는 버퍼에 쌓이는 데이터가 너무 많아 메모리가 부족하게 될 위험성이 있으므로 사용 시 주의해야 합니다. 한편 통지하는 데이터가 하나뿐이므로 반환 값은 Single입니다.
아래 예제는 just() 메소드로 생성하고, 이 Observable이 통지하는 모든 데이터를 toList() 메소드로 리스트에 담아 통지합니다.

Observable.just("A", "B", "C", "D", "E")
        .toList()
        .subscribe(object : SingleObserver<List<String>> {
            override fun onSubscribe(d: Disposable) {
                //
            }

            override fun onSuccess(t: List<String>) {
                println("${Thread.currentThread()}, ${System.currentTimeMillis()}, $t")
            }

            override fun onError(e: Throwable) {
                TODO("Not yet implemented")
            }
        })
=> Thread[main,5,main], 1638484055406, [A, B, C, D, E]

4.2.7 toMap

통지할 데이터를 키와 값 한 쌍으로 Map에 담아 통지

toMap() 메소드는 통지할 데이터를 키와 값의 쌍으로 Map에 담아 통지하는 연산자입니다. 받은 데이터로 키를 생성하고 이 키로 Map에 값을 담습니다. 이후 원본 Observable에서 완료 통지를 받은 시점에 결과 Map을 통지합니다. 그러므로 완료를 통지하지 않는 Observable에서 사용할 수 없습니다. 그리고 대량으로 통지할 경우 메모리가 부족할 수 있습니다. 또한 반환값이 Single입니다.

또한, 다른 데이터가 이미 사용중인 키를 생성하면 이 키와 새로운 데이터의 쌍이 이전에 들어있던 데이터 쌍을 덮어쓰게 되니 주의해야 합니다. 이에 따라 결과 Map에 담긴 데이터 쌍의 수가 원본 Observable이 통지하는 데이터의 전체 개수보다 적을 수도 있습니다.

toMap(keySelector)

아래 예제는 키와 값으로 나누어 저장하는 예제로, 데이터로 생성한 키와 데이터 쌍을 담은 Map을 통지합니다.

Observable.just("1A", "2B", "3C", "1D", "2E")
        .toMap {
            it.substring(0, 1)
        }.subscribe(object : SingleObserver<Map<String, String>> {
            override fun onSubscribe(d: Disposable) {
                //
            }

            override fun onSuccess(t: Map<String, String>) {
                println("$t")
            }

            override fun onError(e: Throwable) {
                //
            }

        })
=> {1=1D, 2=2E, 3=3C}

이전에 말했던 것처럼, 키 값이 같아 새로 생성된 키에 데이터를 덮어썼단 것을 알 수 있습니다.
또한, Single을 통지하므로 데이터만 통지하고 종료되며, 완료 통지는 없습니다.

toMap(keySelector, valueSelector)

아래 예제는 just() 메소드로 데이터를 생성하고, toMap() 메소드를 사용해 통지된 데이터로 키, 값을 생성한 뒤 Map에 넣어 통지합니다. 위와 다른게 있다면, 여기서는 값까지 생성해줍니다.

Observable.just("1A", "2B", "3C", "1D", "2E")
        .toMap({
            it.substring(0, 1)
        }, {
            it.substring(1)
        }).subscribe(object : SingleObserver<Map<String, String>> {
            override fun onSubscribe(d: Disposable) {
                //
            }

            override fun onSuccess(t: Map<String, String>) {
                println("$t")
            }

            override fun onError(e: Throwable) {
                //
            }

        })
=> {1=D, 2=E, 3=C}

4.2.8 toMultiMap

통지할 데이터를 키와 컬렉션 쌍으로 Map에 담아 통지

toMap() 과 다른점은 toMap은 키값이 같다면 새로운 데이터로 덮어쓰지만, toMultiMap()은 컬렉션에 담아 한번에 통지한다! 코드를 보면 더 쉽게 이해가 가능하니 봐보자!

Observable.interval(500L, TimeUnit.MILLISECONDS)
        .take(5)
        .toMultimap {
            if (it.toInt() % 2 == 0) {
                "짝수"
            } else {
                "홀수"
            }
        }.subscribe(object : SingleObserver<MutableMap<String, Collection<Long>>> {
            override fun onSubscribe(d: Disposable) {
                //
            }

            override fun onSuccess(t: MutableMap<String, Collection<Long>>) {
                println("$t")
            }

            override fun onError(e: Throwable) {
                //
            }
        })
Thread.sleep(3000L)
=> {짝수=[0, 2, 4], 홀수=[1, 3]}

다음 포스트에서 이어서 통지 데이터를 제한하는 연산자부터 알아보겠습니다!

profile
뜻을 알고 코딩하기

0개의 댓글