RxJava Observables Combining

HEETAE HEO·2022년 8월 4일
0
post-thumbnail

Observable Combinding이란?

여러 개의 Observable 데이터 스트림을 하나의 Observable로 만들 수 있습니다. 여러 데이터를 이용하여 가공해서 사용하는 경우, 혹은 HTTP 통신의 응답들을 한 번에 묶어서 받고 싶은 경우 등에 사용하게 됩니다.

combineLast()

combineLast()는 두 Observable 중 하나에서 데이터를 발행하려고 할 때, 나머지 한 Observable이 가장 최근에 발행한 데이터를 가져와 지정해준 함수를 거쳐 새로운 하나의 데이터로 발행하게 됩니다.

fun main(args: Array) = runBlocking {
    val ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    val ob2 = Observable.interval(250, TimeUnit.MILLISECONDS)

    val time = System.currentTimeMillis()
    ob1.subscribe { println("ob1 emit: $it") }
    ob2.subscribe { println("ob2 emit: $it") }

    Observable.combineLatest(ob1, ob2, BiFunction { a: Long, b: Long 
                                                   -> "Time:${System.currentTimeMillis() - time} ob1:$a ob2:$b" })
            .subscribe { println(it) }

    delay(500)
}

ob1의 방출이 먼저 시작되나, 0과 1을 방출할 동안 ob2의 방출값이 없기 때문에 ob1의 해당 방출은 버려집니다.

그리고 나서 ob2의 첫번째 방출이 시작되면서 combineLatest로 인한 병합이 시작됩니다.

ob1 emit: 0
ob1 emit: 1
ob2 emit: 0
Time:294 ob1:1 ob2:0
ob1 emit: 2
Time:347 ob1:2 ob2:0
ob1 emit: 3
Time:443 ob1:3 ob2:0
ob1 emit: 4
ob2 emit: 1
Time:543 ob1:4 ob2:0
Time:543 ob1:4 ob2:1

merge()

merge()의 경우 여러 Observable에서 발행되는 모든 데이터들을 한 스트림으로 합쳐 발행해주는 메서드입니다. 말 그대로 merge 한다는 것 입니다. 합쳐진 데이터 스트림의 순서는 각각이 발행된 순서입니다.

fun main() {
    val streamA = Observable.create<Int> {
        Thread {
            Thread.sleep(100)
            it.onNext(1)
            Thread.sleep(100)
            it.onNext(2)
            Thread.sleep(100)
            it.onNext(3)
        }.start()
    }

    val streamB = Observable.create<Int> {
        Thread {
            Thread.sleep(250)
            it.onNext(100)
            Thread.sleep(250)
            it.onNext(200)
            Thread.sleep(250)
            it.onNext(300)
        }.start()
    }

    Observable.merge(streamA, streamB)
        .subscribe(System.out::println)
}
1
2
100
3
200
300

결과를 보면 모든 데이터들이 한 스트림으로 합쳐져서 발행되는 것을 볼 수 있습니다.

zip()

zip()의 경우 여러 Observable을 하나로 결합하되 지정해준 함수를 거쳐 하나의 데이터로 발행합니다. combineLast()가 항상 최신 발행 데이터끼리 합쳐서 발행하는 역할을 했다면 zip()은 여러 개의 Observable 들에서 발행되는 데이터들을 항상 1:1로 매핑하는 것을 보장하며 데이터를 발행하게 됩니다.

fun main(){
    val shapes = arrayOf("BALL", "PENTAGON", "START")
    val colorTriangles = arrayOf("2-T", "4-T", "6-T")

    val source = Observable.zip(
        shapes.toObservable().map{sh-> getSuffix(sh) },
        colorTriangles.toObservable().map{num-> getNum(num) }, BiFunction<String,String,String>{ s , n -> "$n$s"})

    source.subscribe{it->println(it)}
}
internal fun getSuffix(shape: String): String {
    if (shape.equals("PENTAGON", ignoreCase = true)) {
        return "-P"
    }
    return if (shape.equals("START", ignoreCase = true)) {
        "-S"
    } else "-B"
}

internal fun getNum(num: String): String {
    return if (num.indexOf("-") > 0) {
        num.substring(0, num.indexOf("-"))
    } else num
}
2-B
4-P
6-S

references

https://reactivex.io/documentation/operators.html#combining
https://velog.io/@haero_kim/RxJava-Observable-%EA%B2%B0%ED%95%A9%ED%95%98%EA%B8%B0
https://tourspace.tistory.com/289

profile
Android 개발 잘하고 싶어요!!!

0개의 댓글