[RxJava] Observable 결합하기

H43RO·2021년 9월 8일
5

Reactive Programming

목록 보기
10/16
post-thumbnail

🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.

이전 포스팅과 이어집니다.

Observable 결합하기

여러 개의 Observable 데이터 스트림을 하나의 Observable 로 만들 수 있다. 여러 데이터를 이용하여 가공해서 사용하는 경우, 혹은 HTTP 통신의 응답들을 한 번에 묶어서 받고 싶은 경우 등에 사용하게 된다. 이번 포스팅에선 이러한 동작을 하는 Observable 결합 연산자들 중 몇 가지를 알아보고자 한다.

설명만 들어서는 헷갈릴 수 있는 파트다. 마블 다이어그램과 예제 소스코드를 함께 보며 천천이 이해해보자.

💡 여기서 핵꿀팁!

해당 파트의 Reactive X 공식문서는 정말 레전드가 아닐 수 없다. 마블 다이어그램의 데이터들을 직접 좌우로 움직여볼 수 있고, 이 말은 즉슨 데이터 발행의 시점을 바꿔보면서 결과가 어떻게 나오는지를 실시간으로 확인할 수 있다는 뜻이다. 따라서, 글만 읽고 이해하기보다 마블 다이어그램을 적극 활용하는 것을 강력히 추천한다.


combineLast() 메소드

해당 메소드는 두 Observable 중 하나에서 데이터를 발행하려고 할 때, 나머지 한 Observable 이 가장 최근에 발행한 데이터를 가져와 지정해준 함수를 거쳐 새로운 하나의 데이터로 발행하게 한다.

fun main() {
    val intStream: Observable<Int> = Observable.create { 
        Thread {
            for (i in 1..5) {
                it.onNext(i)
                Thread.sleep(1000)
            }
        }.start()
    }

    val strStream: Observable<String> = Observable.create {
        Thread {
            Thread.sleep(500)
            it.onNext("A")
            Thread.sleep(700)
            it.onNext("B")
            Thread.sleep(100)
            it.onNext("C")
            Thread.sleep(700)
            it.onNext("D")
        }.start()
    }

    Observable.combineLatest(
	intStream, strStream
    ) { num, str -> num.toString() + str }
        .subscribe(System.out::println)
}
1A
2A
2B
2C
3C
3D
4D
5D

결과를 보면, 두 쓰레드 내에서 데이터가 발행되고 있는데, strStream 의 쓰레드가 시작되고 0.5초 후에 A 를 발행했고, 그 전에는 이미 intStream 의 쓰레드가 1 을 발행했다. 이 때 combineLatest() 로 만들어진 Observable 은 각각의 최신 발행 데이터인 1A 를 합쳐 1A 를 발행하는 것이다. (해당 동작은 num.toString() + str 에 정의되어 있다)

Thread.sleep() 을 통해 구현한 '데이터 발행 시점'을 하나씩 살펴보며 어떠한 방식으로 동작되는지 살펴보는 것을 권장한다. 항상 가장 최근에 발행된 데이터끼리 합쳐지는 것을 이해했다면 성공이다.


merge() 메소드

무작정 합치고 보는 녀석이다. 여러 Observable 에서 발행되는 모든 데이터들을 한 스트림으로 합쳐 발행해주는 녀석이다. 말 그대로 merge 한다. 합쳐진 데이터 스트림의 순서는 각각이 발행된 순서이다.

fun main() {
    val streamA = Observable.create<Int> {
        Thread {
            Thread.sleep(100)
            it.onNext(1)
            Thread.sleep(100)
            it.onNext(2)
            Thread.sleep(100)
            it.onNext(3)
        }.start()
    }

    val streamB = Observable.create<Int> {
        Thread {
            Thread.sleep(250)
            it.onNext(100)
            Thread.sleep(250)
            it.onNext(200)
            Thread.sleep(250)
            it.onNext(300)
        }.start()
    }

    Observable.merge(streamA, streamB)
        .subscribe(System.out::println)
}
1
2
100
3
200
300

결과를 보면 발행 시간 순으로 모든 데이터들이 한 스트림으로 합쳐져서 발행되는 것을 확인할 수 있다.

🤚🏻 그런데, merge() 로 데이터를 하나씩 발행하다가 에러가 발생하면 어떻게 될까?
정답 : 즉시 데이터 스트림의 데이터 발행이 중단된다.

그럼 만약 원인이 사소한 오류라고 한다면, 나머지 데이터에 대하여 손실이 발생한다.
이럴 때를 대비하여 mergeDelayError 라는 것을 제공해준다.

아래 마블 다이어그램을 보면 알 수 있듯, 에러가 발생한 데이터 제외 스트림 상 모든 데이터를 발행하고 난 뒤에, 발생했던 에러를 onError() 로 발행하게 된다.


zip() 메소드

여러 Observable 을 하나로 결합하되, 지정해준 함수를 거쳐 하나의 데이터로 발행한다. combineLatest() 가 항상 최신 발행 데이터끼리 합쳐서 발행하는 역할을 했다면, zip() 은 여러 개의 Observable 들에서 발행하는 데이터들을 항상 1:1 로 매핑하는 것을 보장하며 데이터를 발행하게 된다.

fun main() {
    val intStream = Observable.create<Int> {
        Thread {
            for (i in 1..5) {
                it.onNext(i)
                Thread.sleep(1000)
            }
        }.start()
    }

    val strStream = Observable.create<String> {
        Thread {
            Thread.sleep(500)
            it.onNext("A")
            Thread.sleep(700)
            it.onNext("B")
            Thread.sleep(100)
            it.onNext("C")
            Thread.sleep(700)
            it.onNext("D")
        }.start()
    }

    Observable.zip(
        intStream, strStream
    ) { num: Int, str: String -> num.toString() + str }
        .subscribe(System.out::println)
}
1A
2B
3C
4D

출력 결과를 보면 알 수 있듯, strStream 이 발행하는 데이터는 4개고 intStream 이 발행하는 데이터는 5개이다. 항상 1:1 로 매핑되는 것을 보장하기 때문에, 총 4번 데이터가 발행되는 것을 확인할 수 있다.


오늘은 여러 Observable 을 한 개의 Observable 으로 결합하는 동작을 수행하는 연산자 몇 가지를 알아보았다. 다음 포스팅에선 에러 핸들링과 관한 이야기를 해보려고 한다.

profile
어려울수록 기본에 미치고 열광하라

0개의 댓글