RxJava: Observable을 중심으로

이현우·2021년 5월 17일
2

Deep Dive: Android

목록 보기
5/10
post-thumbnail

Observable

데이터 흐름에 맞게 알림(Based on Observer Pattern)을 보내 Subscriber가 데이터를 처리할 수 있도록 한다

Observable Class

Observable은 옵저버 패턴을 구현한다. 객체의 상태 변화를 관찰하는 관찰자 목록을 객체에 등록하고,

상태 변화가 있을 때마다 메서드를 호출 -> 관찰자에게 변화를 통지

Why Observable?

Observed가 관찰을 통해서 얻은 결과를 의미한다면

Observable은 현재는 관찰되지 않았지만 이론을 통해서 앞으로 관찰할 가능성을 의미한다.

  • The Function of Reason(이성의 기능)

Observable의 종류

  • Observable
  • Maybe
    • 데이터가 발행될 수 있거나 발행되지 않아도 완료되는 경우
  • Flowable
    • Back Pressure에 대응하는 기능을 추가로 제공(Observable에서 데이터를 무자비하게 빠르게 발행할 때)

Observable's Notification

  • onNext
    • 데이터 발행을 알려줍니다
  • onComplete
    • 모든 데이터의 발행이 완료되었음을 알려줍니다
    • 단 한번만 발생
    • 더 이상 onNext가 발생안되게 해야됨
  • onError
    • 에러가 발생할 때
    • 이후 onNext, onComplete가 실행이 안 됨

Observable Factory Method

Observable은 객체를 생성하지 않고 Factory 패턴으로 생성

  • create(), just(), (Deprecated) from()
  • fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher()
  • interval(), range(), timer(), defer()

just()

    public static <T> Observable<T> just(T item1, ...,  T item10) {
        ObjectHelper.requireNonNull(item1, "item1 is null");
        ...
        ObjectHelper.requireNonNull(item10, "item10 is null");

        return fromArray(item1, item2, item3, item4, item5, item6, item7, item8, item9, item10);
    }

(에반데) 위와 같이 순서대로 데이터들을 받아오고 Observable Timeline(마블 다이어그램 위쪽 선)으로 발행

create() vs just()

  • just: onNext, onComplete, onError 커스텀할 필요없음
  • create: 개발자가 직접 콜백을 설정
        val taskObservableCreate = Observable
            .create { emitter: ObservableEmitter<Task> ->
                emitter.onNext(Task(description = "", isComplete = false, priority = 0))
                // 끝내려면 onComplete 호출
                emitter.onComplete()
            }
            .subscribe { Log.d(TAG, "$it") }

주의점

  • Observable이 dispose될 대 콜백 해지(memory leak)
  • 구독하는 동안에만 onNext, onComplete 호출
  • 에러는 오직 onError에서만
  • BackPressure는 직접 처리

TMI) RxJava is Declarative

내가 코드로 작성하는 것이 어떻게 작동되는 지를 명세하는 것이 아니라,

무엇인지 명세해주는 방식, 이 객체가 어떤 객체인지 정의함

subscribe() 함수

Observable은 Factory로 객체를 정의하고 subscribe 함수로 데이터를 발행시킨다

Disposable disposable() -> onError 이벤트 발생 시 Exception throw, 디버깅할 때 사용
Disposable subscribe(
    Consumer <? super T> onNext,
    Consumer <? super java.lang.Throwable> onError,
    Action onComplete
) -> 이벤트 처리

from~ : Observable Factory Method

  • fromArray, fromIterable
    • Array, ArrayList, Queue 등 Java(Kotlin) Collections에서 제공하는 자료구조 활용 Observable 생성
  • fromCallable
    • 비동기 처리 Wrapper Class인 Callable의 return 값을 활용하여 Observable 생성
  • fromFuture
    • future의 return 값을 활용하여 Observable 생성
  • fromPublisher
    • publisher(Java 9) 활용하여 Observable 생성

Disposable

onComplete 이벤트가 발생되었다면 dispose는 호출할 필요가 없는데...

-> 만약 처리가 안되면 메모리 릭 발생이 되니 CompositeDisposable Class 활용해서 객체가 destroy될 때 관계 해제

Single: Special Type of Observable

Observable은 데이터 무한 발행 쌉가능,

But, Single은 오로지 하나!

How to create it

// 1. Observable -> Single
Single.fromObservable(source)
    // Single<T>
    .subscrbe(...)

// 2. single()
Observable.just("Hello! World!")
    .single("Default value")
    // Single<T>
    .subscribe(...)

// 3. first()
Observable.fromArray(source)
    .first("default")
    // Single<T>
    .subscribe(...)

// 4. empty() -> single()
Observable.empty()
    .single("default")
    // Single<T>
    .subscribe(...)

// 5. take(1) -> single()
Observable.fromArray(source)
    .take(1)
    .single("default")
    // Single<T>
    .subscribe(...)

Maybe = Single + onComplete

Maybe 클래스는 데이터를 하나만 발행하지만 0개도 발행할 수 있음!

즉, onComplete 메서드를 하나 더 추가해서 구현하는 형식

Cold Observavbles vs Hot Observables

Cold Observavbles

  • subscribe 함수를 호출하여 구독을 해야 데이터가 발행됨
  • 웹/DB/서버 요청과 같은 URL(데이터) 지정하고 요청을 보내서 결과를 받아오는 로직은 Cold Observable로 구현

Hot Observables

  • 구독자가 있던 없던 데이터가 계속 발행되는 Observable
    • 여러 구독자를 동시에 구독할 수 있음
  • 주식, 마우스(키보드) Event, 센서(주식) 데이터 등
    • 실시간으로 계속 받아와서 표시할 때에는 Hot Observable로 구현
  • 배압(BackPressure)을 무조건 고려해야됨
    • 발행속도와 구독속도의 차이가 클 때 발생

Switch Cold to Hot

  • Subject 클래스
  • ConnectableObservable

Subject 클래스

Subject 클래스는 구독자와 Cold Observable의 특성이 모두 공존한다

  • 데이터를 발행할 수도 있고, 발행된 데이터를 바로 처리할 수도 있음

  • AsyncySubject

  • BehaviorSubject

  • PublishSubject

  • ReplaySubject

AsyncSubject

완료되기 전 마지막 데이터에만 관심, 이전 데이터는 무시

// AsyncSubject 객체 생성
val subject = AsyncSubject.create<String>()

// Subject의 구독자 설정
subject.subscribe { Log.d(TAG, it) }

// Subject 발행(이건 무시)
subject.onNext("Hi")
subject.onNext("Hello")

// Subject의 구독자 설정
subject.subscribe { Log.d(TAG, "${it + " Second}") }

// Subject 발행(이건 무시)
subject.onNext("HyunWoo")
subject.onComplete()
subject.onNext("Fake")
subject.subscribe { Log.d(TAG, "${it + " Third}") }

// 최종 결과, onComplete가 호출되기 직전의 마지막 결과만 처리, 이후 onNext는 Fake
// onComplete가 호출된 이후의 subscriber는 최종값만 가져옴
TAG: HyunWoo
TAG: HyunWoo Second
TAG: HyunWoo Third

BehaviorSubject

구독을 하면 가장 최근 값 혹은 기본 값을 넘겨 받음

// BehaviorSubject 생성
val subject = BehaviorSubject.createDefault<String>("DEFAULT")

// 디폴트 값 출력됨(처음에는)
subject.subscribe { Log.d(TAG, "First Subscriber -> $it") }

// 발행
subject.onNext("1")
subject.onNext("3")

// 구독자 하나 더 추가
subject.subscribe { Log.d(TAG, "Second Subscriber -> $it") }

// 발행
subject.onNext("5")
subject.onComplete()

// 결과
TAG: First Subscriber -> 6
TAG: First Subscriber -> 1
TAG: First Subscriber -> 3
TAG: Second Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5

PublisherSubject

가장 일반적인 Subject 클래스, subscribe() 함수를 호출하면 값을 발행하기 시작한다.
이후에 구독을 해도 최근의 값을 받아오지 않음(BehaviorSubject와 다른 점)

val subject = PublisherSubject.create<String>()

// 구독 시작
subject.subscribe { Log.d(TAG, "First Subscriber -> $it") }

// 발행
subject.onNext("1")
subject.onNext("3")

// 구독자 하나 더 추가
subject.subscribe { Log.d(TAG, "Second Subscriber -> $it") }

// 발행
subject.onNext("5")
subject.onComplete()

// 결과
TAG: First Subscriber -> 1
TAG: First Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5

ReplaySubject : 취급 주의

데이터의 처음부터 끝가지 발행하는 것을 보장해줌
모든 데이터 저장할 때 메모리 릭이 나는 가능서옫 염두해야 됨

val subject = ReplaySubject.create<String>()

// 구독 시작
subject.subscribe { Log.d(TAG, "First Subscriber -> $it") }

// 발행
subject.onNext("1")
subject.onNext("3")

// 구독자 하나 더 추가
subject.subscribe { Log.d(TAG, "Second Subscriber -> $it") }

// 발행
subject.onNext("5")
subject.onComplete()

// 결과
TAG: First Subscriber -> 1
TAG: First Subscriber -> 3
TAG: Second Subscriber -> 1
TAG: Second Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5

ConnectableObservable

Cold Observable이긴 한데 여러명의 구독자들에게 쏴주고 싶을 때!

val dataList = listOf("1", "3", "5")

// Cold Observable 만들기
val observableData = Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map(Long::intValue)
    .map { dataList[it] }
    .take(dataList.length)

// ConnectableObservable 만들기
val dataSource = observableData.publish()

// subscribe(ConnectableObservable, 얘에다가 연결만)
dataSource.subscribe { Log.d(TAG, "First Subscriber -> $it") }
dataSource.subscribe { Log.d(TAG, "Second Subscriber -> $it") }

// ConnectableObservable과 연결
dataSource.connect()

// Hot Observable의 특성 -> 일정 시간이 지나면 그 이후로 발행된 데이터만 받아올 수 있음
Thread.sleep(250L)
dataSource.subscribe { Log.d(TAG, "Third Subscriber -> $it") }

// 결과
TAG: First Subscriber -> 1
TAG: Second Subscriber -> 1
TAG: First Subscriber -> 3
TAG: Second Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5
TAG: Third Subscriber -> 5

한 판 정리

데이터 발행자(DataSource)

  • Observable
  • Sigle
  • Maybe
  • Subject
  • Completable

데이터 수신자

  • 구독자(Subscriber)
  • 옵저버(Observer)
  • 소비자(Consumer)
    • Java 8과 같은 이름을 사용하기 위해
profile
이현우의 개발 브이로그

0개의 댓글