📝RxJava의 학습 순서
1. Observable 클래스를 명확하게 이해(특히 Hot Observable과 Cold Observable의 개념을 꼭 이해해야함)
2. 변환, 제어, 결합 연산자 등 카테고리별 주요 함수 공부
3. 스케줄러의 의미, subscribeOn()과 observeOn()함수의 차이
4. 그 밖의 디버깅, 흐름 제어함수
RxJava에서의 Scheduler는 RxJava 비동기 프로그래밍을 위한 쓰레드 관리자이며
즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해서 제어할 수 있다.
RxJava만 사용한다고 해서 비동기 처리가되는 것이 아닌 Scheduler를 통해 쓰레드를 분리해주어야 비동기 작업이 가능한 것이다.
Scheduler를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있고 쓰레드를 위한 코드의 간결성과 쓰레드의 복잡함을 줄일 수 있다.
스케줄러를 지정하기 위해서 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn(), 구독자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn() 연산자를 사용한다.
subscribeOn은 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관 없고, observeOn은 여러번 호출될 수 있으며 이후에는 실행되는 연산에 영향을 주므로 위치가 중요하다.
Observable.fromIterable(shapes) // shapes 리스트는 (Red,Ball), (Green, Ball), (Blue,Ball) 이렇게 구성되어 있습니다.
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.doOnSubscribe(data -> MyUtil.printData("doOnSubscribe")) // printData System.out.println(""+Thread.currentThread().getName()+" | "+message+" | ")
.doOnNext(data -> MyUtil.printData("doOnNext", data))
.observeOn(Schedulers.newThread())
.map(data -> {data.shape = "Square"; return data;})
.doOnNext(data -> MyUtil.printData("map(Square)", data))
.observeOn(Schedulers.newThread())
.map(data -> {data.shape = "Triangle"; return data;})
.doOnNext(data -> MyUtil.printData("map(Triangle)", data))
.observeOn(Schedulers.newThread())
.subscribe(data -> MyUtil.printData("subscribe", data));
subscribeOn의 경우 여러번 호출되더라도 맨 처음의 호출만 영향을 주기 때문에 computation 스케줄러에서 데이터 흐름이 발생되고 , 그 후 observeOn(Schedulers.newThread())에 의해 연산이 new thread에서 실행된다.
main | doOnSubscribe |
RxComputationThreadPool-1 | doOnNext | MyShape{color='Red', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Green', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Blue', shape='Ball'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Red', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Green', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Blue', shape='Square'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Blue', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Blue', shape='Triangle'}
Scheduler의 종류로는 single, computation, io, trampoline, new_thread가 존재하고,
RxJava에서는 이 중 Computation, IO, Trampoline 세 가지의 Scheduler를 권장합니다.
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
}
Scheduler | 생성 방법 | 내용 |
---|---|---|
SINGLE | Schedulers.single() | 단일 스레드를 생성해 계속 재사용 |
COMPUTATION | Schedulers.computation() | 내부적으로 스레드 풀 생성, 스레드 개수 = 프로세서 개수 |
IO | Schedulers.io() | 필요할 때 마다 스레드를 계속 생성 |
TRAMPOLINE | Schedulers.trampoline() | 현재 스레드에 무한한 크기의 대기 큐 생성 |
NEW_THREAD | Schedulers.newThread() | 매번 새로운 스레드 생성 |
단일 스레드를 계속 재사용합니다. RxJava 내부에서 스레드를 별로도 생성하며, 한 번 생성된 스레드로 여러 작업을 처리한다. 비동기 처리를 지향한다면 Single 스레드 스케쥴러를 사용할 일은 거의 없다.
CPU에 대응하는 계산용 스케줄러이며 IO 작업을 하지 않고 일반적인 계산/연산 작업을 할 때 사용한다.
파일 입출력 등의 IO 작업을 하거나 네트워크 요청 처리 시에 사용하는 스케쥴러이다. Computation 스케쥴러와 다르게 필요할 때 마다 스레드를 계속 생성한다.
새로운 스레드를 생성하지 않고 사용하고 있는 현재 스레드에 무한한 크기의 대기 큐를 생성한다.
다른 스케쥴러와 달리 요청을 받을 때 마다 매번 새로운 스레드를 생성한다.