[Android] RxJava (2) - Observable, Scheduler

Oxong·2021년 7월 6일
0

21.07.06
공부한 것을 정리하는 용도의 글이므로 100% 정확하지 않을 수 있습니다.
참고용으로만 봐주시고, 내용이 부족하다고 느끼신다면 다른 글도 보시는 것이 좋습니다.
+ 틀린 부분, 수정해야 할 부분은 언제든지 피드백 주세요. 😊
                                            by. ryalya



들어가기 전

Reactive(반응형) UI를 만드는데 도움을 주는 RxJava.

지난 글인 [Android]RxJava란? 에서 Reactive Programming과 RxJava의 개념과 기본 사용법, Marble Diagram(마블 다이어그램)을 정리했다.

오늘은 아주 조오오오금만 더 깊게 들어가보려 한다.

개념, 예제 등을 찾다보면 수박 겉핥기식으로 작성되어있는 글도 많지만, 굉장히 꼼꼼하게 정리되어있는 글도 있다.

이런 곳 발견하면 약간 노다지 찾은 기분? 😁

바로 사용하고 적용하기에는 어려울 수도 있지만 천리길도 한걸음부터라니까... 조금씩 조금씩 이해해보려 노력해야지🙂



Observable이란?

RxJava의 가장 핵심적 요소는 Observable이다.

살짝 리마인딩해보자면 Observable Class는 옵저버(Observer) 패턴을 구현하는데, 옵저버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록한다.

음~ RxJava에 대해 1도 모르는 사람이라면 이렇게 설명한 글을 만나면 '아니..; 지만 이해한거야 모야...' 라는 생각이 들 것이다. 🙂

아래 그림을 보며 이해해보자.

  1. Observer(옵저버)는 Observable을 subscribe(구독)한다.

  2. Observable이 데이터스트림을 처리하고 완료되면 데이터를 발행(emit)한다.

  3. 데이터를 발행할 때, Observable은 Event(알림)를 Observer에게 보낸다.

  4. Observable을 subscribe하고 있는 Observer들은 데이터를 소비(Consumer)하여 Do Something을 한다.

(사실 subscribe가 먼저 이루어지는 것이 아니고, 알림을 보낸 후, Observer가 Subscribe해야 데이터가 발행되고 소비된다고 하는데 이것보다 위의 느낌으로 이해하는 Observer의 관찰자가 그 순간을 감지하고 준비된 연산을 실행시켜 결과를 리턴하는 메커니즘 때문에, observable을 구독한다고 표현하는것이 나는 더 쉬워서 우선 그렇게 이해하고, 더 공부하면서 변경해나갔다.)

즉, 정리하면
Observable은 '관찰가능한'이라는 뜻에서 알 수 있듯이, 어떤 데이터를 Observer가 처리할 수 있도록 데이터를 가지고 있는 상자? 정도로 생각하자.


그렇다면 Subscribe란? 정확히 무엇일까?


Subscribe

observer와 Observable을 연결하는 메소드.

위에서 언급한 것 처럼 Observable은 구독(subscribe)을 해야 데이터가 발행된다.
just() 함수만 호출하면 데이터를 발행하지 않는다.

따라서 observer는 Observable을 구독하여 데이터를 발행하면, 수신한 데이터를 원하는 방식으로 사용하는데, 수신한 데이터를 가지고 할 행동을 정의하는 것을 subscribe라고 한다.

onNext

: Observable은 새로운 항목을 방출할 때마다 이 메소드를 호출한다.

이 메소드는 Observable이 배출하는 항목을 파라미터로 받는다.

onError

: Observable은 오류가 발생하거나, 예상 데이터를 생성하지 못했을 때 이 메소드를 호출한다.

이렇게하면 Observable이 중지되고, onNext 또는 onCompleted를 더 이상 호출하지 않는다.

onError메서드는 오류정보를 저장하고 있는 객체를 파라미터로 받는다.

onCompleted

:데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext()를 더 호출하지 않음을 나타낸다.



Scheduler

스케줄러(Scheduler)란 다중 프로그래밍을 가능하게 하는 운영 체제의 동작 기법을 말한다.

  • RxJava의 스케줄러(Scheduler)는 코드가 어느 스레드에서 실행될 것인지 지정 하는 역할.

  • 스케줄러를 통해 스레드를 분리해주어야 비동기 작업이 가능.

  • 자바 비동기 프로그래밍 시, 자바 스레드를 만들면서 경쟁 조건이나 synchronized와 달리 코드가 간결함.

  • subscribeOn() 함수와 onserveOn() 함수를 모두 지정하면, [Observable에서 데이터 흐름이 발행하는 스레드]와 [처리된 결과를 구독자에게 발행하는 스레드]를 분리할 수 있다.

  • subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행됨

  • 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행

스케줄러의 지정은 subscribeOnObserveOn 연산자를 통해 가능하다.

subscribeOn()

→ Observable이 데이터 흐름을 발생시키고 연산하는 스레드(작업을 시작하는 스레드)를 지정 (= Observable연산을 사용하기위해 처음 사용할 스레드를 지정)

→ 중복해서 적을 경우, 가장 마지막에 적은 스레드에서 시작.

→ 구독(subscribe)에서 사용할 스레드를 지정.
도중 ObserveOn이 호출되어도 SubscribeOn의 스레드 지정에는 영향을 끼치지 않는다.


observeOn()

→ Observable이 Observer에게 알림을 보내는 스레드(Observable이 다음 처리를 진행할때 사용할 스레드)를 지정.

→ ObserveOn이 선언된 후 처리가 진행뒤 다른 ObserveOn이 선언시 다른 ObserveOn에서 선언한 스레드로 변경되어 이후 처리를 진행


아래 이미지를 보자.

  1. 중간 정도에 subscribeOn(▷)이 파란색 삼각형으로 설정되어있는 것을 확인할 수 있다.

  2. 이를 통해 subscribeOn의 경우, 연산자 체인 중 아무곳에서 호출해도 문제가 되지 않는 다는 것을 알 수 있다. (Operator 어디서든지 subscribeOn을 사용하여 시작할 스케줄러를 정할 수 있다.)

  3. 파란색 subscribeOn 연산자에 의해 데이터가 처음 발행되는 스레드는 맨 위의 파란색 스레드임을 알 수 있다.

  4. 그 후, ObserveOn 연산자에 의해 주황색 스레드가 실행이 되는데, map 연산이 주황색 스레드에서 이루어진다.

  5. 그리고 마지막 ObserveOn 연산자에 의해 핑크색 스레드가 실행된다.



Observable 연산자

Observable은 생성, 변환, 필터, 결합, 오류 처리, 조건 등 다양한 연산자들을 제공한다.

오늘은 간단하게 몇가지의 대표적인 연산자들만 확인해보자.

create()

→ onNext, onComplete, onError등의 알림을 개발자가 직접 설정(호출)해야 하는 Observable.

just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만, create() 함수는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 한다.


defer()

→ Observer가 구독할 때까지 기다렸다가 구독하면 그 때 Observable 생성


just()

→ 아이템을 그대로 발행하는 Observable을 생성. just()연산자의 인자로 넣은 아이템을 차례로 발행하며, 한 개의 아이템을 넣을 수도 있고, 타입이 같은 여러 아이템(최대 10개)을 넣을 수도 있다.


from()

= Observable.fromCallable()
→ 구독이 발생할때 call()함수가 호출되는 lazy initailze(지연초기화) 함수.


interval()

→ 특정 시간별로 연속된 정수형을 배출하는 Observable을 생성한다.


range()

→ 특정 범위의 시작점부터 원하는 갯수만큼의 정수형(Integer) 아이템을 emit하는 Observable 생성.


timer()

→ 특정 시간 이후에(지정 시간 delay이후) 정수형 0의 아이템을 발행.



Scheduler 종류

RxJava와 RxAndroid에서는 다양한 스케쥴러를 제공한다.

간단하게 몇가지만 정리해보자.


SINGLE

  • 생성 방법 : Schedulers.single()
  • RxJava 내부에서 단일 스레드를 별도로 생성하여 구독작업을 처리한다.
    생성된 스레드는 여러 번 구독 요청이 오면 계속해서 재사용한다.
    하지만 reactive Programming은 비동기 프로그래밍을 지향하므로, 이 스케쥴러는 활용성이 낮다.

COMPUTATION

  • 생성 방법 : Schedulers.computation()
  • CPU에 대응하는 계산용 스케줄러.
  • 계산 작업(입출력(I/O) 작업을 하지 않는)을 할 때는 대기 시간 없이 빠르게 결과를 도출하는 것이 중요하다.
  • 내부적으로 스레드 풀 생성, 스레드 개수=프로세서 개수.

IO

  • 생성 방법 : Schedulers.io()
  • 네트워크 상의 요청처리 or 각종 입,출력 작업 실행을 위한 스케줄러.
  • 필요할 때마다 스레드를 계속 생성

Computation 스케줄러는 CPU개수만큼 스레드를 생성하지만, IO 스케줄러는 필요할 때마다 스레드를 생성한다.

Computation 스케줄러 → 일반적인 계산 작업
IO 스케줄러 → 네트워크상의 요청, 파일 입출력, DB 쿼리 등


TRAMPOLINE

  • 생성 방법 : Schedulers.trampoline()
  • 새로운 스레드를 생성하지 않고, 현재 스레드에 무한한 크기의 대기 큐(Queue) 생성.
  • 새로운 스레드를 생성하지 않고 main 스레드에서 모든 작업을 처리하며,
    Queue에 작업을 넣은 후 1개 씩 꺼내서 동작하기 때문에 순서가 바뀌는 경우가 발생하지 않는다.

NEW_THREAD

  • 생성 방법 : Schedulers.newThread()
  • 요청을 받을 때마다 새로운 스레드 생성.

FROM(EXECUTOR)

  • 생성방법 : Schedulers.from(executor)
  • 자바에서는 java.util.current 패키지에서 제공하는 실행자(Executor)를 변환하여 스케줄러를 생성할 수 있다.
  • 하지만 Executor 클래스와 스케줄러의 동작 방식이 다르므로 추천방법은 아니며,
    기존에 사용하던 Executor 클래스를 재사용하기 때문에 활용도는 낮다.

RxJava는 Computation, IO, Trampoline 스케줄러의 사용을 권장한다.




마무리하며

지난 글 까지는 RxJava 생각보다 재밌네! 라고 생각했었는데 조금 세세하게 하게 들어오니까 조금 어렵다...

모든 내용을 정리하는 것은 불가능하더라도 어느 정도까지는 최대한 나누지 않고 한 번에 정리하고 싶었는데.... 양이 생각보다 많았다...

다음 글에서는 아래의 내용을 정리하려 한다. (최대한 한 번에 다 정리할 수 있기를...😅)

  • ⭐ Hot Observable & Cold Observable개념

  • map(), filter(), reduce(), flatMap() 사용법 이해.

  • Lambda 함수 적용



Reference

Reactivex document

RxJava, Observable 알아보기

RxJava 이해하기 - 2. Observable

RxJava 이해하기 - 5. 스케줄러

0개의 댓글