RxJava기록 - 스케줄러의 종류

dada·2022년 1월 13일
0

RxJava

목록 보기
11/12

뉴 스레드 스케줄러 - Schedulers.newThread()

  • 새로운 스레드를 생성하고 새로운 스레드를 만들어 어떤 동작을 실행하고 싶을 때 Schedulers.newThread()를 인자로 넣어주면 된다.
  • 뉴 스레드 스케줄러는 새로운 스레드를 생성하여 내가 원하는 동작을 처리하는 방법이다. 하지만 적극적으로 추천하는 방법은 아니다. RxJava에는 뉴 스레드 스케줄러보다 활용도가 높은 계산 스케줄러와 IO 스케줄러와 같은 다른 스케줄러를 제공하기 때문이다.

계산 스케줄러

  • CPU에 대응하는 계산용 스케줄러이며 대기 시간 없이 빠르게 결과를 도출하는 것이 중요

💊interval() 함수의 원형

@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(
    long period, TimeUnit unit, Scheduler scheduler)
  • CUSTOM은 개발자가 원하는 스케줄러를 지정할 수 있다는 의미
  • 계산 스케줄러는 입출력 작업을 하지 않는 스케줄러
String[] orgs = {"1","3","5"};
Observable<String> source = Observable.fromArray(args)
	.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b) -> a);

//구독#1
source.map(item -> "<<" + item + ">>")
	.subscrbieOn(Schedulers.computation())
	.subscribe(Log::i);
//구독#2
source.map(item -> "##" + item + "##")
	.subscrbieOn(Schedulers.computation())
	.subscribe(Log::i);
CommonUtils.sleep(1000);

//결과값
RxComputationThreadPool-3 | value = <<1>>
RxComputationThreadPool-4 | value = ##1##
RxComputationThreadPool-3 | value = <<3>>
RxComputationThreadPool-4 | value = ##3##
RxComputationThreadPool-3 | value = <<5>>
RxComputationThreadPool-4 | value = ##5##
  • zipWith() : 데이터와 시간을 합성할 수 있음
  • 첫번째 구독과 두번째 구독이 거의 동시에 이루어지기 때문에 RxJava 내부에서 동일한 스레드에 작업을 할당

IO 스케줄러

  • 네트워크상의 요청을 처리하거나 각종 입출력 작업을 실행하기 위한 스케줄러
  • 계산 스케줄러와 달리 기본으로 생성되는 스레드 개수가 다름
    • 계산 스케줄러는 CPU개수만큼 스레드를 생성하지만 IO스케줄러는 필요할 때마다 스레드를 계속 생성
💡 계산 스케줄러 : 일반적인 계산 작업 IO 스케줄러 : 네트워크상의 요청, 파일 입출력, DB 쿼리 등

트램펄린 스케줄러

  • 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬을 생성하는 스케줄러
String[] orgs = {"1", "3", "5"};
Observable<String> source = Observable.fromArray(orgs);

//구독 #1 
source subscribeOn(Schedulers.trampoline())
	.map(data -> "<<" + data ">>")
	.subscribe(Log::i);

//구독 #2
source.subscribeOn(Schedulers.trampoline())
	.map(data -> "##" + data + "##")
	.subscribe(Log::i)
CommonUtils.sleep(500);

//결과값 
main | value = <<1>>
main | value = <<3>>
main | value = <<5>>
main | value = ##1##
main | value = ##3##
main | value = ##5##

👉🏻새로운 스레드를 생성하지 않고 main스레드에서 모든 작업을 실행

큐에 작업을 넣은 후 1개씩 꺼내어 동작하므로 첫 번째 구독과 두 번째 구독의 실행 순서가 바뀌는 경우는 없음

싱글 스레드 스케줄러

  • RxJava내부에서 단일 스레드를 별도로 생성하여 구독 작업을 처리
  • 생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용
  • 리액티브 프로그래밍이 비동기 프로그래밍을 지향하기 때문에 싱글 스레드 스케줄러를 활용할 확률은 낮음

스케줄러를 활용하여 콜백 지옥 벗어나기

CommonUtils.exampleStart();
Observable<String> first = Observable.just(FIRST_URL)
	.subscribeOn(Schedulers.io())
	.map(OkHttpHelper::get);
Observable<String> second = Observable.just(SECOND_URL)
	.subscribeOn(Schedulers.io())
	.map(OkHttpHelper::get);

Observable.zip(first, second, (a,b) -> ("\n>> " + a + "\n>> " + b))
	.subscribe(Log::it);

CommonUtils.sleep(5000);
  • 첫번째 Url의 response가 성공하면 두번째 Url를 호출 ⇒ zip을 이용하여 두개의 Observable 이용
profile
기록하기

0개의 댓글