지난 시간에 이어 RxJava의 연산자에 대해 알아보자.
kotlin의 collection 함수들을 써봤다면 익숙한 함수들을 많이 만날 수 있다.
Observable.create<String> { emitter ->
emitter.onNext("Hello")
emitter.onNext("RxJava")
emitter.onComplete()
}.subscribe { println(it) }
Observable.defer {
Observable.create<String> { emitter ->
emitter.onComplete()
}
}.subscribe(::println)
val items = arrayOf("Hello", "World")
Observable.fromArray(*items).subscribe(::println)
Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe(::println)
Thread.sleep(555)
Observable.just(1, 2, 3).subscribe(::println)
Observable.range(3, 2).subscribe(::println)
val observable = Observable.just("Hello", "World")
.repeat(2)
observable.subscribe(::println)
println("start : ${System.currentTimeMillis()}")
Observable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe {
println("start : ${System.currentTimeMillis()} + $it")
}
bservable.fromIterable(0..8)
.buffer(2,4)
.subscribe(::println)
Observable.fromIterable(0..3)
.map { "RxJava : $it" }
.subscribe(::println)
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6))
.xxxMap { original: Int ->
Observable.just("$original plusplus")
.delay(Random.nextLong(5), TimeUnit.SECONDS)
}
.subscribe(::println)
flatMap : 랜덤한 순서 출력
concatMap : 순서대로 출력
switchMap : 6 출력
Observable.fromIterable(0..3)
.scan { t1, t2 -> t1 + t2 }
.subscribe(::println)
Observable.interval(250,TimeUnit.MILLISECONDS)
.debounce(333, TimeUnit.MILLISECONDS)
.subscribe(::println) // 출력 없음
Thread.sleep(1000)
data class T(val a: Int, val b: Int)
Observable.just(T(2, 3), T(2, 4), T(1, 5), T(7, -1), T(2, 2))
.distinct { it.a + it.b } // key selector
.subscribe(::println)
Observable.just(11,true,"Hello","Rx",false)
.ofType(String::class.java)
.filter { it.length == 2 }
.subscribe(::println)
Observable.just(6,4,2)
.ignoreElements()
.subscribe(::println)
Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.throttleLast(250, TimeUnit.MILLISECONDS)
.subscribe {
println(System.currentTimeMillis())
println(it)
}
Thread.sleep(1000)
Observable.just(1,2,3,4,5,6,7,8,9,10)
.take(9)
.skip(2)
.skipLast(2)
.takeLast(3)
.subscribe(::println)
val observable1 = Observable.interval(1000L, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(750L, TimeUnit.MILLISECONDS).map { it + 1000 }
Observable.combineLatest<Long, Long, String>(
observable1, observable2,
BiFunction { t1, t2 -> // RxKotlin은 BiFunction없이 람다를 쓸 수 있다.
"$t1 $t2"
}
).subscribe {
println(System.currentTimeMillis())
println(it)
}
Thread.sleep(3000)
val observable1 = Observable.interval(0, 1000, TimeUnit.MILLISECONDS).map { "1:$it" }
val observable2 = Observable.interval(0, 500, TimeUnit.MILLISECONDS).map { "2:$it" }
val observable = Observable.merge(observable1, observable2)
observable.subscribe {
println(it)
}
val observable1 = Observable.just(1, 2, 3, 4, 5, 6, 7)
val observable2 = Observable.just("a", "b", "c", "d", "e", "f")
val observable = Observable.zip(
observable1, observable2,
BiFunction<Int, String, String> { t1, t2 -> // RxKotlin시 람다로 가능
"$t1 and $t2"
})
observable.subscribe(::println)
Observable.just(1,2,3,4,5)
.startWith(500)
.subscribe(::println)
println(System.currentTimeMillis())
Observable.just("Hello", "World")
.delay(1000, TimeUnit.MILLISECONDS)
.subscribe {
println(System.currentTimeMillis())
}
Thread.sleep(1111)
Observable.just("Hello", "RxJava")
.doOnNext { println("doOnNext") }
.doOnSubscribe { println("doOnSubscribe") }
.doAfterNext { println("doAfterNext") }
.doAfterTerminate { println("doAfterTerminate") }
.doOnEach { println("doOnEach") } // 모든 이벤트에 걸림 complete까지
.doFinally { println("Finally") }
.doOnComplete { println("doOnComplete") }
.doOnDispose { println("doOnDispose") }
.doOnError { println("doOnError") }
.doOnTerminate { println("doOnTerminate") }
.subscribe { println("- $it") }
Observable.just(1,2)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread()) // Android용
.subscribe(::println)
Observable.just("Hello", "World")
.delay(3000L, TimeUnit.MILLISECONDS)
.timeout(2000L, TimeUnit.MILLISECONDS)
.subscribe((::println), (::println))
Thread.sleep(4000)
Observable.just(1, 2, 3, 4, 5)
.map { if (it > 4) throw IOException() else it}
.retry { cnt, t2 ->
cnt <= 1
}
.subscribe((::println), (::println))
Completable.complete()
.andThen(Completable.complete())
.subscribe { println("Everything Success") }
글 두개로 RxJava의 기본 of 기본을 알아봤다. 공식문서, 홈페이지에서 마블 다이어그램과 함께 더 좋은 설명과 다양한 연산자들을 배울 수 있다.
필자도 이제 입문하는 입장이므로, 새로운 사용 예시를 알게될 때 마다 이곳에 정리해볼 생각이다.
잘 정리해주셔서 감사합니다 많은 도움이 됐습니다