RxJava 는 다양한 문제를 해결할 수 있는 범용적인 솔루션이다. 특히 멀티 쓰레딩과 같은 비동기 작업을 효율적으로 구현할 수 있는 환경을 제공해준다. 이는 스케줄러라는 녀석을 활용하게 된다. 스케줄러는 데이터 스트림이 어떤 쓰레드에서 데이터를 발행하는지, 구독자는 어떤 쓰레드에서 이벤트 발생을 통보받는지에 대해 지정해줄 수 있다.
RxJava 에서는 Schedulers 클래스에서 제공하는 정적 패토리 메소드를 통해 스케줄러를 설정해줄 수 있다.
val singleSchedulers = Schedulers.single()
val ioSchedulers = Schedulers.io()
val newThreadSchedulers = Schedulers.newThread()
val computationSchedulers = Schedulers.computation()
val trampolineSchedulers = Schedulers.trampoline()
// 안드로이드에서만 제공하는 특별한 스케줄러
// val mainThread = AndroidSchedulers.mainThread()
Single 스케줄러
val singleSchedulers = Schedulers.single()
Single 스케줄러는 단일 쓰레드를 생성하여 이를 계속 재사용하는 방식을 활용한다. RxJava 내부에서 쓰레드를 별도로 생성하여, 한 번 생성된 쓰레드를 활용하며 여러 작업을 처리하게 된다.
IO 스케줄러
val ioSchedulers = Schedulers.io()
이 녀석은 네트워킹 작업이나 DB 트랜잭션, 파일 시스템 환경 등 블로킹이 발생할 수 있는 곳에서 비동기적으로 작업을 처리하기 위해 사용되는 스케줄러이다. 쓰레드 풀을 사용하여 새로운 쓰레드가 필요할 때마다 쓰레드를 계속 생성하되, 이전에 생성했던 쓰레드가 존재한다면 이를 재사용한다. 내부적으로
CachedThreadPool
을 채택했다.
newThread 스케줄러
val newThreadSchedulers = Schedulers.newThread()
newThread 스케줄러는 매번 새로운 쓰레드를 생성하여 작업을 처리하도록 지정해주는 녀석이다.
Computation 스케줄러
val computationSchedulers = Schedulers.computation()
해당 스케줄러는 단순한 반복 작업, 콜백 처리 등등 컴퓨팅 및 계산적인 작업에 사용한다. CPU 에 대응하는 계산용 스케줄러이고, 내부적으로 쓰레드 풀을 활용한다. 기본적으로 쓰레드 개수는 프로세서 개수와 같다.
Trampoline 스케줄러
val trampolineSchedulers = Schedulers.trampoline()
새로운 쓰레드를 생성하지 않고, 현재 쓰레드에 무한한 크기의 큐를 생성한다. 큐의 특성인 FIFO 에 따라, 모든 작업을 들어온 순서대로 (순차적으로) 실행하는 것을 보장하게 된다.
mainThread 스케줄러 (RxAndroid 에만 포함)
val mainThread = AndroidSchedulers.mainThread()
RxAndroid 에서는 안드로이드 메인 쓰레드를 지정하는 스케줄러를 제공한다.
RxJava 에서 스케줄러를 활용하기 위해선, subscribeOn
메소드와 observeOn
메소드를 활용해볼 수 있다. 이것들만 있다면 정말 간단하게 멀티 쓰레딩을 구현해볼 수 있다.
우선, 0 부터 3까지 데이터를 발행하는 Observable 을 생성해보자. 아래와 같이 구현할 수 있을 것이다.
fun main() {
Observable.create<Int> {
for (i in 0..3){
val threadName = Thread.currentThread().name
println("#발행 [$threadName] : $i")
it.onNext(i)
Thread.sleep(100)
}
}.subscribe {
val threadName = Thread.currentThread().name
println("#구독 [$threadName] : $it")
}
}
위 코드는 스케줄러 연산자를 사용하지 않았다. 그랬더니 결과는 다음과 같이 나온다.
#발행 [main] : 0
#구독 [main] : 0
#발행 [main] : 1
#구독 [main] : 1
#발행 [main] : 2
#구독 [main] : 2
#발행 [main] : 3
#구독 [main] : 3
스케줄러를 지정해주지 않는다면, 데이터 발행과 구독이 모두 메인 쓰레드에서 진행된다.
subscribeOn()
그럼 이제 한번, subscribeOn()
메소드를 활용하여 스케줄러를 지정해보자.
fun main() {
Observable.create<Int> {
for (i in 0..3) {
val threadName = Thread.currentThread().name
println("#발행 [$threadName] : $i")
it.onNext(i)
Thread.sleep(100)
}
}.subscribeOn(Schedulers.io())
.subscribe {
val threadName = Thread.currentThread().name
println("#구독 [$threadName] : $it")
}
Thread.sleep(500)
}
실행해보면 아래와 같이 결과를 출력한다.
#발행 [RxCachedThreadScheduler-1] : 0
#구독 [RxCachedThreadScheduler-1] : 0
#발행 [RxCachedThreadScheduler-1] : 1
#구독 [RxCachedThreadScheduler-1] : 1
#발행 [RxCachedThreadScheduler-1] : 2
#구독 [RxCachedThreadScheduler-1] : 2
#발행 [RxCachedThreadScheduler-1] : 3
#구독 [RxCachedThreadScheduler-1] : 3
위 결과에서 알 수 있듯, 해당 연산자는 Observable 데이터 스트림에 어떤 스케줄러를 사용하여 데이터를 발행할지 지정해주는 메소드이다. 만약 subscribeOn
이 메소드 체이닝되어 있는데 observeOn
이 체이닝되지 않은 경우 발행되는 데이터를 구독하는 쓰레드도 동일한 쓰레드에서 동작하도록 한다.
observeOn()
그렇다면 observeOn
은 발행되는 데이터를 구독하는 쓰레드를 지정해주는 연산자로 유추할 수 있다. 일단 코드에 적용해보자. 위 예제 코드에 observeOn()
을 덧붙여 아래와 같이 스케줄러를 지정해준 뒤 결과를 확인해보자.
fun main() {
Observable.create<Int> {
for (i in 0..3) {
val threadName = Thread.currentThread().name
println("#발행 [$threadName] : $i")
it.onNext(i)
Thread.sleep(100)
}
}.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe {
val threadName = Thread.currentThread().name
println("#구독 [$threadName] : $it")
}
Thread.sleep(500)
}
#발행 [RxCachedThreadScheduler-1] : 0
#구독 [RxComputationThreadPool-1] : 0
#발행 [RxCachedThreadScheduler-1] : 1
#구독 [RxComputationThreadPool-1] : 1
#발행 [RxCachedThreadScheduler-1] : 2
#구독 [RxComputationThreadPool-1] : 2
#발행 [RxCachedThreadScheduler-1] : 3
#구독 [RxComputationThreadPool-1] : 3
결과를 보니 발행과 구독이 각기 다른 쓰레드에서 실행되고 있음을 확인할 수 있다. 비로소 멀티 쓰레딩을 구현하게 된 것이다.
observeOn
연산자를 활용하여 스케줄러를 지정해준다면, Observable 데이터 스트림에서 발행한 데이터를 가로채서 지정한 스케줄러에서 이를 구독한다. 따라서 위와 같은 결과가 나오게 된다.
일반적으로 안드로이드에서 RxJava 를 사용 목적에 맞게 가장 많이 사용하는 부분은 백엔드 서버와 네트워킹 동작을 하거나, DB 쿼리 동작을 수행하는 부분이다. 따라서 이에 가장 적합한 IO 스케줄러를 subscribeOn()
을 통해 지정해줌으로써 IO 스케줄러 상으로 결과 데이터를 발행할 수 있도록 한다.
그리고 안드로이드에선 보통 위와 같은 비동기 동작의 결과물을 메인 쓰레드 (UI 쓰레드) 에서 UI 를 갱신하는 등 활용하기 때문에 이를 AndroidSchedulers.mainThread()
로 지정한다.
repository.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
거의 국룰이다시피 활용되는 스케줄러들이기 때문에, 아래와 같이
Observable
혹은Single
데이터 스트림에 확장함수 형태로 스케줄러 지정 코드를 정의해두기도 한다. (필자도 애용한다)fun <T> Single<T>.applySchedulers() = subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()) fun <T> Observable<T>.applySchedulers() = subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())