요약하자면 ReactiveX에서는 아이템을 발행하는 스레드와 아이템을 소비하는 스레드를 따로 지정하여 멀티 스레드를 구현할 수 있는 것입니다. 이때, 사용하는 연산자가 subscribeOn과 observeOn입니다.
RxKotlin(RxJava)와 RxAndroid에서 사용할 수 있는 스케줄러는 다음과 같습니다.
IO 스케줄러(Schedulers.io())
newThread 스케줄러(Schedulers.newThread())
Computation 스케줄러(Schedulers.computation())
Trampoline 스케줄러(Schedulers.trampoline())
Single 스케줄러(Schedulers.single())
mainThead 스케줄러(AndroidSchedulers.mainThread())
지금까지 작성한 글들의 예제에서는 대부분 메인 스레드(UI 스레드)에서 동작했습니다. 그 이유는 RxKotlin은 기본적으로 Observer가 선언되고 구독되는 스레드에서 동작하기 때문입니다. 아래의 예제는 0~2까지 메인 스레드로 아이템을 발행하고 메인 스레드로 구독하는 예제입니다.
val observable = Observable.create<Int> { emitter ->
for(i in 0 until 3) {
val threadName = Thread.currentThread().name
println("#Observable on $threadName : $i")
emitter.onNext(i)
Thread.sleep(100)
}
emitter.onComplete()
}
observable.subscribe {
val threadName = Thread.currentThread().name
println("#Obsever on $threadName : $it")
}
/*
#Observable on main : 0
#Obsever on main : 0
#Observable on main : 1
#Obsever on main : 1
#Observable on main : 2
#Obsever on main : 2
*/
아이템을 발행하고 구독하는 곳에서 현재 사용되고 있는 스레드의 이름을 출력하는 코드입니다. 모두 메인 스레드에서 동작하는 것을 확인할 수 있습니다.
subscribeOn 연산자는 Observable 소스에 어떤 스케줄러를 사용하여 아이템을 발행할지 알려줍니다.
만약, Observable 체인에 subscribeOn 연산자만 있고 observeOn 연산자가 없다면 subscribeOn 연산자에서 지정한 스케줄러에서 아이템 발행부터 구독까지 이루어집니다.
마블 다이어그램을 보면 아이템을 발행하는 Observable과 아이템을 소비하는 Observer 모두 subscribeOn 연산자에 지정된 스케줄러인 주항색 삼각형을 사용하고 있는 것을 확인할 수 있습니다.
val observable = Observable.create<Int> { emitter ->
for(i in 0 until 3) {
val threadName = Thread.currentThread().name
println("#Observable on $threadName : $i")
emitter.onNext(i)
Thread.sleep(100)
}
emitter.onComplete()
}
observable.subscribeOn(Schedulers.io())
.subscribe {
val threadName = Thread.currentThread().name
println("#Obsever on $threadName : $it")
}
Thread.sleep(500)
/*
#Observable on RxCachedThreadScheduler-1 : 0
#Obsever on RxCachedThreadScheduler-1 : 0
#Observable on RxCachedThreadScheduler-1 : 1
#Obsever on RxCachedThreadScheduler-1 : 1
#Observable on RxCachedThreadScheduler-1 : 2
#Obsever on RxCachedThreadScheduler-1 : 2
*/
observeO 연산자를 사용하여 스케줄러를 지정하면 Observable에서 발행된 아이템을 가로채어 해당 스케줄러로 아이템을 구독합니다.
마블 다이어그램을 보면 아이템을 발행하는 Observable은 검은 삼각형의 스케줄러를 통해 아이템을 발행하고, 아이템을 소비하는 Observer는 observeOn 연산자로 지정된 주황색 삼각형 스케줄러를 통해서 아이템을 소비하는 것을 확인할 수 있습니다.
val observable = Observable.create<Int> { emitter ->
for(i in 0 until 3) {
val threadName = Thread.currentThread().name
println("#Observable on $threadName : $i")
emitter.onNext(i)
Thread.sleep(100)
}
emitter.onComplete()
}
observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe {
val threadName = Thread.currentThread().name
println("#Obsever on $threadName : $it")
}
Thread.sleep(500)
/*
#Observable on RxCachedThreadScheduler-1 : 0
#Obsever on RxComputationThreadPool-1 : 0
#Observable on RxCachedThreadScheduler-1 : 1
#Obsever on RxComputationThreadPool-1 : 1
#Observable on RxCachedThreadScheduler-1 : 2
#Obsever on RxComputationThreadPool-1 : 2
*/
스케줄러를 지정하지 못하는 몇 개의 연산자가 존재합니다. interval, timer, replay, buffer 등의 연산자는 ReactiveX에서 computation 스케줄러로 이미 고정했기에 다른 스케줄러를 지정하더라도 무시됩니다.
Observable.interval(200, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io()) // subscribeOn을 통해 Observable을 IO 스케줄러로 지정
.subscribe {
println("${Thread.currentThread().name} : $it")
}
Thread.sleep(1000)
/*
IO 스케줄러로 지정하였지만 무시되고 Computation 스케줄러에서
아이템을 발행하는 모습을 확인할 수 있다.
RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
*/
참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23
에 작성되었습니다.
아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs