[안드로이드] RxKotlin(RxJava) #4 - 결합 연산자

hee09·2022년 9월 23일
0

RxKotlin

목록 보기
4/7

Observable을 결합하는 연산자

여러 Observable 소스를 결합하여 하나의 Observable을 생성하고, 동작하는 연산자들입니다. 하나씩 알아보도록 하겠습니다.


주요 연산자

combineLatest

  • 두 개의 Observable 중 한 소스에서 아이템이 발행될 때, 두 Observable에서 가장 최근에 발행한 아이템을 취합하여 하나로 발행하는 연산자입니다.

  • 실무에서 많이 사용되는 연산자 중 하나로, 여러 개의 http 요청에 의한 응답을 하나로 묶어서 처리할 때 사용됩니다.

  • 마블 다이어그램을 보면 분홍색 원이 첫 번째 Observable에서 발행되고 두 번째 Observable에서 마름모가 발행되자 분홍색 마름모가 방출된 것을 볼 수 있습니다. 그리고 첫 번째 Observable에서 주항색 원이 발행되었을 때, 두 번째 Observable의 가장 최근 데이터는 마름모이기에 주황색 마름모가 방출된 것을 볼 수 있습니다.

// 1초마다 i값을 방출하는 Observable
val observable1 = Observable.create<Int> { emitter ->
    object : Thread() {
        override fun run() {
            for(i in 1..5) {
                emitter.onNext(i)
                try {
                    sleep(1000)
                } catch(ie: InterruptedException) {
                    ie.printStackTrace()
                }
            }
        }
    }.start()
}

// 0.5초마다 문자를 방출하는 Observable
val observable2 = Observable.create<Char> { emitter ->
    object : Thread() {
        override fun run() {
            for(i in 'a'..'d') {
                emitter.onNext(i)
                try {
                    sleep(500)
                } catch (ie: InterruptedException) {
                    ie.printStackTrace()
                }
            }
        }
    }.start()
}

Observable.combineLatest(
    observable1,
    observable2,
    BiFunction{ num, chr -> "$num$chr" }
).subscribe {
    println(it)
}

Thread.sleep(5000)

/*
결과
1a
1b
1c
2c
2d
3d
4d
5d
 */

zip 연산자

  • zip 연산자는 여러 Observable을 하나로 결합하여 지정된 함수를 통해 하나의 아이템으로 발행합니다.

  • combineLatest와 비슷해 보이지만 combineLatest 연산자는 가장 최근에 발행한 아이템을 기준으로 결합하는 데 반해 zip은 여러 Observable의 발행 순서를 엄격히 지켜 아이템을 결합합니다.

  • 마블 다이어그램을 보면 두 번째 Observable에서 C, D 아이템을 발행하고, 첫 번째 Observable에서 3 아이템을 발행했을 때, 두 번째 Observable의 3번째 아이템에 해당하는 C와 합쳐져서 3C가 발행된 것을 확인할 수 있습니다.

// 1초마다 i값을 방출하는 Observable
val observable1 = Observable.create<Int> { emitter ->
    object : Thread() {
        override fun run() {
            for(i in 1..5) {
                emitter.onNext(i)
                try {
                    sleep(1000)
                } catch(ie: InterruptedException) {
                    ie.printStackTrace()
                }
            }
        }
    }.start()
}

// 0.5초마다 문자를 방출하는 Observable
val observable2 = Observable.create<Char> { emitter ->
    object : Thread() {
        override fun run() {
            for(i in 'a'..'d') {
                emitter.onNext(i)
                try {
                    sleep(500)
                } catch (ie: InterruptedException) {
                    ie.printStackTrace()
                }
            }
        }
    }.start()
}

Observable.zip(
    observable1,
    observable2
) { num, chr ->
    "$num$chr"
}.subscribe {
    println(it)
}

Thread.sleep(5000)

/*
결과
1a
2b
3c
4d
 */

merge

  • merge 연산자를 이용하면 여러 Observable을 하나의 Observable처럼 결합하여 사용할 수 있습니다.

  • 여러 Observable이 발행하는 아이템을 발행 시점에 하나의 스트림에 교차해 끼워 넣어 하나의 Observable을 만듭니다. 이때, 데이터가 발행되는 순서로 데이터를 방출합니다.

  • 마블 다이어그램을 보면 두 개의 Observable을 데이터가 발행되는 순서대로 방출하고 있는데, 오류가 발생하면 방출을 중단하는 모습을 확인할 수 있습니다.

val observable1 = Observable.intervalRange(
    1, // 시작값
    5, // 발행 횟수
    0, // 초기 지연
    100, // 발행 간격
    TimeUnit.MILLISECONDS // 간격 단위
).map { value ->
    value * 20
}

val observable2 = Observable.create { emitter ->
    object : Thread() {
        override fun run() {
            for (i in 0..2) {
                emitter.onNext(1)
                try {
                    sleep(300)
                } catch (ie: InterruptedException) {
                    ie.printStackTrace()
                }
            }
        }
    }.start()
}

Observable.merge(
    observable1,
    observable2
).subscribe {
    println(it)
}

Thread.sleep(700)

/*
결과
20
1
40
60
1
80
100
1
 */

참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23에 작성되었습니다.

아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs

profile
되새기기 위해 기록

0개의 댓글