여러 Observable 소스를 결합하여 하나의 Observable을 생성하고, 동작하는 연산자들입니다. 하나씩 알아보도록 하겠습니다.
두 개의 Observable 중 한 소스에서 아이템이 발행될 때, 두 Observable에서 가장 최근에 발행한 아이템을 취합하여 하나로 발행하는 연산자입니다.
실무에서 많이 사용되는 연산자 중 하나로, 여러 개의 http 요청에 의한 응답을 하나로 묶어서 처리할 때 사용됩니다.
마블 다이어그램을 보면 분홍색 원이 첫 번째 Observable에서 발행되고 두 번째 Observable에서 마름모가 발행되자 분홍색 마름모가 방출된 것을 볼 수 있습니다. 그리고 첫 번째 Observable에서 주항색 원이 발행되었을 때, 두 번째 Observable의 가장 최근 데이터는 마름모이기에 주황색 마름모가 방출된 것을 볼 수 있습니다.
// 1초마다 i값을 방출하는 Observable
val observable1 = Observable.create<Int> { emitter ->
object : Thread() {
override fun run() {
for(i in 1..5) {
emitter.onNext(i)
try {
sleep(1000)
} catch(ie: InterruptedException) {
ie.printStackTrace()
}
}
}
}.start()
}
// 0.5초마다 문자를 방출하는 Observable
val observable2 = Observable.create<Char> { emitter ->
object : Thread() {
override fun run() {
for(i in 'a'..'d') {
emitter.onNext(i)
try {
sleep(500)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
}
}
}.start()
}
Observable.combineLatest(
observable1,
observable2,
BiFunction{ num, chr -> "$num$chr" }
).subscribe {
println(it)
}
Thread.sleep(5000)
/*
결과
1a
1b
1c
2c
2d
3d
4d
5d
*/
zip 연산자는 여러 Observable을 하나로 결합하여 지정된 함수를 통해 하나의 아이템으로 발행합니다.
combineLatest와 비슷해 보이지만 combineLatest 연산자는 가장 최근에 발행한 아이템을 기준으로 결합하는 데 반해 zip은 여러 Observable의 발행 순서를 엄격히 지켜 아이템을 결합합니다.
마블 다이어그램을 보면 두 번째 Observable에서 C, D 아이템을 발행하고, 첫 번째 Observable에서 3 아이템을 발행했을 때, 두 번째 Observable의 3번째 아이템에 해당하는 C와 합쳐져서 3C가 발행된 것을 확인할 수 있습니다.
// 1초마다 i값을 방출하는 Observable
val observable1 = Observable.create<Int> { emitter ->
object : Thread() {
override fun run() {
for(i in 1..5) {
emitter.onNext(i)
try {
sleep(1000)
} catch(ie: InterruptedException) {
ie.printStackTrace()
}
}
}
}.start()
}
// 0.5초마다 문자를 방출하는 Observable
val observable2 = Observable.create<Char> { emitter ->
object : Thread() {
override fun run() {
for(i in 'a'..'d') {
emitter.onNext(i)
try {
sleep(500)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
}
}
}.start()
}
Observable.zip(
observable1,
observable2
) { num, chr ->
"$num$chr"
}.subscribe {
println(it)
}
Thread.sleep(5000)
/*
결과
1a
2b
3c
4d
*/
merge 연산자를 이용하면 여러 Observable을 하나의 Observable처럼 결합하여 사용할 수 있습니다.
여러 Observable이 발행하는 아이템을 발행 시점에 하나의 스트림에 교차해 끼워 넣어 하나의 Observable을 만듭니다. 이때, 데이터가 발행되는 순서로 데이터를 방출합니다.
마블 다이어그램을 보면 두 개의 Observable을 데이터가 발행되는 순서대로 방출하고 있는데, 오류가 발생하면 방출을 중단하는 모습을 확인할 수 있습니다.
val observable1 = Observable.intervalRange(
1, // 시작값
5, // 발행 횟수
0, // 초기 지연
100, // 발행 간격
TimeUnit.MILLISECONDS // 간격 단위
).map { value ->
value * 20
}
val observable2 = Observable.create { emitter ->
object : Thread() {
override fun run() {
for (i in 0..2) {
emitter.onNext(1)
try {
sleep(300)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
}
}
}.start()
}
Observable.merge(
observable1,
observable2
).subscribe {
println(it)
}
Thread.sleep(700)
/*
결과
20
1
40
60
1
80
100
1
*/
참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23
에 작성되었습니다.
아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs