[안드로이드] RxKotlin(RxJava) #6 - 스케줄러

hee09·2022년 9월 23일
0

RxKotlin

목록 보기
6/7

스케줄러

  • RxKotiln(RxJava)에서는 스케줄러(Scheduler)라는 도구를 사용하여 멀티 스레드와 같은 비동기 작업을 돕습니다.
  • 스케줄러를 이용해서 데이터 발행을 하는 쪽과 데이터 소비를 하는 쪽 스레드를 별도로 지정해서 분리할 수 있습니다.
  • RxKotlin(RxJava)에서 스케줄러를 지정하기 위해서 subscribeOn(), observeOn() 연산자를 사용합니다.
  • subscribeOn 연산자는 Observable 소스에 어떤 스케줄러를 사용하여 아이템을 발행할지 알려줍니다.
  • 만약, Observable 체인에 subscribeOn 연산자만 있고 observeOn 연산자가 없다면 subscribeOn 연산자에서 지정한 스케줄러에서 아이템 발행부터 구독까지 이루어집니다.
  • observeOn 연산자를 사용하여 스케줄러를 지정하면 Observable에서 발행된 아이템을 가로채어 해당 스케줄러로 아이템을 구독합니다.

요약하자면 ReactiveX에서는 아이템을 발행하는 스레드와 아이템을 소비하는 스레드를 따로 지정하여 멀티 스레드를 구현할 수 있는 것입니다. 이때, 사용하는 연산자가 subscribeOn과 observeOn입니다.


스케줄러의 종료

RxKotlin(RxJava)와 RxAndroid에서 사용할 수 있는 스케줄러는 다음과 같습니다.

  • IO 스케줄러(Schedulers.io())

    • 네트워크, 데이터베이스, 파일 시스템 환경 등에서 사용하는 스케줄러입니다.
    • 블로킹 이슈가 발생할 수 있는 곳에서 비동기적인 작업을 위해 사용될 수 있습니다.
    • 스레드 풀에서 스레드를 가져오거나 가져올 스레드가 없으면 새로운 스레드를 생성합니다.
  • newThread 스케줄러(Schedulers.newThread())

    • 매번 새로운 스케줄러(스레드)를 생성합니다.
    • 매번 새롭게 생성되기에 스레드 비용이 많이 들고, 재사용되지도 않습니다.
  • Computation 스케줄러(Schedulers.computation())

    • 단순 반복적인 작업, 콜밸 처리 그리고 계산적인 작업에 사용됩니다.
    • 블로킹 이슈가 발생하는 곳에서 사용하는 것을 추천하지 않습니다.
    • CPU 코어의 물리적 스레드 수를 넘지 않는 범위에서 스레드를 생성합니다.
  • Trampoline 스케줄러(Schedulers.trampoline())

    • 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 큐를 생성하는 스케줄러입니다.
    • 모든 작업을 순차적으로 실행하는 것을 보장합니다(FIFO).
  • Single 스케줄러(Schedulers.single())

    • 단일 스레드를 생성하여 처리 작업을 진행합니다.
    • 여러번 구독하더라도 공통으로 사용됩니다.
  • mainThead 스케줄러(AndroidSchedulers.mainThread())

    • 안드로이드 메인 스레드에서 작동하는 스케줄러를 제공합니다.
    • 안드로이드의 메인 스레드(UI 스레드)에서 작동하기에, UI를 변경하는 경우 사용할 수 있습니다.

subscribeOn과 observeOn 연산자

지금까지 작성한 글들의 예제에서는 대부분 메인 스레드(UI 스레드)에서 동작했습니다. 그 이유는 RxKotlin은 기본적으로 Observer가 선언되고 구독되는 스레드에서 동작하기 때문입니다. 아래의 예제는 0~2까지 메인 스레드로 아이템을 발행하고 메인 스레드로 구독하는 예제입니다.

val observable = Observable.create<Int> { emitter ->
    for(i in 0 until 3) {
        val threadName = Thread.currentThread().name
        println("#Observable on $threadName : $i")
        emitter.onNext(i)
        Thread.sleep(100)
    }
    emitter.onComplete()
}

observable.subscribe {
    val threadName = Thread.currentThread().name
    println("#Obsever on $threadName : $it")
}

/*
#Observable on main : 0
#Obsever on main : 0
#Observable on main : 1
#Obsever on main : 1
#Observable on main : 2
#Obsever on main : 2
 */

아이템을 발행하고 구독하는 곳에서 현재 사용되고 있는 스레드의 이름을 출력하는 코드입니다. 모두 메인 스레드에서 동작하는 것을 확인할 수 있습니다.


subscribeOn 연산자

  • subscribeOn 연산자는 Observable 소스에 어떤 스케줄러를 사용하여 아이템을 발행할지 알려줍니다.

  • 만약, Observable 체인에 subscribeOn 연산자만 있고 observeOn 연산자가 없다면 subscribeOn 연산자에서 지정한 스케줄러에서 아이템 발행부터 구독까지 이루어집니다.

  • 마블 다이어그램을 보면 아이템을 발행하는 Observable과 아이템을 소비하는 Observer 모두 subscribeOn 연산자에 지정된 스케줄러인 주항색 삼각형을 사용하고 있는 것을 확인할 수 있습니다.

val observable = Observable.create<Int> { emitter ->
    for(i in 0 until 3) {
        val threadName = Thread.currentThread().name
        println("#Observable on $threadName : $i")
        emitter.onNext(i)
        Thread.sleep(100)
    }
    emitter.onComplete()
}

observable.subscribeOn(Schedulers.io())
    .subscribe {
    val threadName = Thread.currentThread().name
    println("#Obsever on $threadName : $it")
}

Thread.sleep(500)

/*
#Observable on RxCachedThreadScheduler-1 : 0
#Obsever on RxCachedThreadScheduler-1 : 0
#Observable on RxCachedThreadScheduler-1 : 1
#Obsever on RxCachedThreadScheduler-1 : 1
#Observable on RxCachedThreadScheduler-1 : 2
#Obsever on RxCachedThreadScheduler-1 : 2
 */
  • 예제 코드를 보면 subscribeOn의 인자로 IO 스케줄러를 지정하였습니다. 이렇게 지정하고 아이템을 발행하는 스레드와 아이템을 소비하는 스레드를 출력하자 모두 IO 스레드로 지정된 것을 확인할 수 있습니다.

observeOn 연산자

  • observeO 연산자를 사용하여 스케줄러를 지정하면 Observable에서 발행된 아이템을 가로채어 해당 스케줄러로 아이템을 구독합니다.

  • 마블 다이어그램을 보면 아이템을 발행하는 Observable은 검은 삼각형의 스케줄러를 통해 아이템을 발행하고, 아이템을 소비하는 Observer는 observeOn 연산자로 지정된 주황색 삼각형 스케줄러를 통해서 아이템을 소비하는 것을 확인할 수 있습니다.

val observable = Observable.create<Int> { emitter ->
    for(i in 0 until 3) {
        val threadName = Thread.currentThread().name
        println("#Observable on $threadName : $i")
        emitter.onNext(i)
        Thread.sleep(100)
    }
    emitter.onComplete()
}

observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe {
    val threadName = Thread.currentThread().name
    println("#Obsever on $threadName : $it")
}

Thread.sleep(500)

/*
#Observable on RxCachedThreadScheduler-1 : 0
#Obsever on RxComputationThreadPool-1 : 0
#Observable on RxCachedThreadScheduler-1 : 1
#Obsever on RxComputationThreadPool-1 : 1
#Observable on RxCachedThreadScheduler-1 : 2
#Obsever on RxComputationThreadPool-1 : 2
 */
  • 예제 코들를 보면 Observable은 subscribeOn 연산자를 통해 지정한 IO 스케줄러를 사용하고 있고, Observer는 observeOn 연산자를 통해 지정한 Computation 스케줄러를 사용하고 있는 모습을 확인할 수 있습니다.

스케줄러를 지정하지 못하는 연산자

스케줄러를 지정하지 못하는 몇 개의 연산자가 존재합니다. interval, timer, replay, buffer 등의 연산자는 ReactiveX에서 computation 스케줄러로 이미 고정했기에 다른 스케줄러를 지정하더라도 무시됩니다.

Observable.interval(200, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io()) // subscribeOn을 통해 Observable을 IO 스케줄러로 지정
    .subscribe {
        println("${Thread.currentThread().name} : $it")
    }

Thread.sleep(1000)

/*
IO 스케줄러로 지정하였지만 무시되고 Computation 스케줄러에서 
아이템을 발행하는 모습을 확인할 수 있다.

RxComputationThreadPool-1 : 0
RxComputationThreadPool-1 : 1
RxComputationThreadPool-1 : 2
RxComputationThreadPool-1 : 3
 */

참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23에 작성되었습니다.

아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs

profile
되새기기 위해 기록

0개의 댓글