RxJava 입문하기 (Kotlin) - 기본1

CmplxN·2020년 7월 26일
4

앱 개발자로 취업준비를 하면, 요구사항이나 우대사항에 RxJava(iOS는 RxSwift)를 자주 본다.
그래서 Kotlin 언어로 RxJava를 사용하는 방법을 익혀보자.
RxKotlin, RxAndroid 등 좀더 특화된 라이브러리도 있지만, 기본은 RxJava다.

개인적으로 처음부터 완벽히 이해하기보다는 예제를 통해 점차 이해도를 높이는 방식을 선호한다.
뭐라는지 잘 몰라도 결국 예제를 따라하다 보면 조금씩 이해할 수 있을 것이다.
그래서 쉬운 예제 무작정 따라하기 ==> 점차 이해도 높이기 방식으로 공부해보자.

RxJava 시작하기

사전 지식

  • Kotlin 문법의 이해 (또는 Java)
  • 옵저버 패턴의 이해

개발환경 설치

  • intelliJ 설치 (Community Edition이면 충분하다.)
  • jdk 설치 및 경로 설정 (검색하면 잘 나와있다.)
  • Gradle - Kotlin/JVM으로 프로젝트를 생성
    프로젝트 설정
  • build.gradle파일의 dependency를 설정하자.
    rxkotlin 자체에 rxjava가 의존성이 있으므로, 아래 코드를 gradle에 추가한다.
dependencies {
    ...
    implementation "io.reactivex.rxjava2:rxkotlin:2.4.0"
    ...
}

Hello RxJava

  • 그러면 정상적으로 세팅되었는지 Hello RxJava를 찍어보자.
fun main() {
    Observable // 생산자 : 데이터를 생산하여 전달
        .just("Hello?", "RxJava!?") // 생성 연산자
        .map { it.dropLast(1) } // 변환 연산자 : 데이터의 가공 (뒷자리 하나 제거)
        .subscribe(::println) // 소비자 : 데이터를 받아서 처리 (println)
}

결과
Hello RxJava
1. 전달한 "Hello?", "RxJava!?"에서
2. dropLast(1)하여 "Hello"와 "RxJava!"가 전달되어
3. 전달된 두 문자열을 println함수를 통해 출력한 모습이다.

생산자와 소비자

공통

  • 생산자는 데이터를 생산해서 전달하는 역할을 한다.
  • 소비자를 등록하는 방법은 크게 두가지가 있다.
  1. Observer 방식
    Observer 인터페이스를 구현한 객체를 subscribe해서 소비자를 추가한다.
    subscribe의 return type은 Unit이다.
val observer = object : Observer<Int> {
        override fun onComplete() {
            // Observable이 완료된 경우
        }
        override fun onSubscribe(d: Disposable) {
            // Observable이 데이터 전달할 준비가 되었을 때.
            // 작업 취소를 위한 Disposable에 대한 레퍼런스를 여기서 받음
        }
        override fun onNext(t: Int) {
            // Observable이 데이터를 전달할 때 호출
        }
        override fun onError(e: Throwable) {
            // Observable이 에러를 전달할 때 호출. Error시 Complete없이 종료다.
        }
    }
    Observable.just(1, 2, 3, 4).subscribe(observer)
  1. Consumer 방식
    각각의 Consumer를 subscribe해서 소비자를 추가한다. (주로 이것을 사용한다고 한다.)
    Consumer는 메소드 한개짜리 자바 인터페이스이므로 SAM을 통해 람다로 표현할 수 있다.
    subscribe의 return type이 Disposable이다.
val disposable: Disposable = Observable.just(1, 2, 3, 4)
        .subscribe(
            { println("onNext $it") }, // onNext: Consumer
            { println("onError") }, // onError: Consumer
            { println("onComplete") }, // onComplete: Consumer
            { println("onSubscribe") } // onSubscribe: Consumer
        )

rst

Observable

  • 0개에서 n개의 데이터를 전달하는 생산자다.
  • 기본적인 생산자로 단건(0 or 1)이 아니면 대부분 Observable을 쓴다.
  • observer 방식으로 consumer 등록시 Observer를 구현해 전달한다.
  • consumer 방식을 사용할 시 onNext, onComplete, onError와 onSubscribe가 있다.
val observer = Observable.just(11, 12, 13)
        .map {
            if (it == 12) throw IllegalStateException() // 12에 에러
            else it
        }
    observer.subscribe(
        { println("onNext $it") },
        { println("onError") },
        { println("onComplete") },
        { println("onSubscribe") })

obs

Single

  • 오직 1개의 데이터를 전달하는 생산자다.
  • Http GET Request와 같이 결과가 1개의 데이터 or 실패인 경우 사용한다.
  • observer 방식으로 consumer 등록시 SingleObserver를 구현해 전달한다.
  • consumer 방식을 사용할 시 onSuccess와 onError만 있다.
Single.just(1)
        .subscribe(
            { println("onSuccess $it") }, 
            { println("onError") }
        )

Completable

  • 0개의 데이터를 전달하는 생산자다.
  • db에 insert, update와 같이 데이터가 필요 없이 성공 or 실패인 경우 사용한다.
  • observer 방식으로 consumer 등록시 CompletableObserver를 구현해 전달한다.
  • consumer 방식을 사용할 시 onComplete와 onError만 있다.
Completable.complete()
        .subscribe(
            { println("onComplete") },
            { println("onError") }
        )

Maybe

  • 0개 또는 1개의 데이터를 전달하는 생산자다.
  • 예 / 아니오 선택과 같이 (둘 중 하나 + 예외 경우)에 쓸 수 있다.
  • observer 방식으로 consumer 등록시 MaybeObserver를 구현해 전달한다.
  • consumer 방식을 사용할 시 onSuccess, onComplete와 onError가 있다.
Maybe.empty<Unit>()
        .subscribe(
            { println("onSuccess $it") },
            { println("onComplete") },
            { println("onError") }
        )

Flowable

  • 데이터의 발행 속도가 구독자의 처리속도보다 크게 빠를 때 사용 (BackPressure Issue)
  • observer 방식으로 consumer 등록시 FlowableSubscriber를 구현해 전달한다.
  • BackPressure Issue를 처리하는 방법을 설정할 수 있다.
  • LiveDataReactiveStreams을 사용해 AAC LiveData와 연계할 수 있다.
  • 언제 Observable을 쓰고 언제 Flowable을 써야할까?
Flowable.just(1, 2, 3, 4)
        .subscribe(
            { println("onNext $it") }, // onNext: Consumer,
            { println("onError") }, // onError: Consumer,
            { println("onComplete") }, // onComplete: Consumer,
            { println("onSubscribe") } // onSubscribe: Consumer
        )

Subjects

공통

  • Observable과 Observer의 성격을 둘 다 가지고 있다.
  • 즉 subscribe를 달 수 있으며 동시에 onNext, onComplete 등을 달 수 있다.
  • 다음 코드를 Subject 종류만 다르게 해서 한 결과를 보면서 이해하자.
    val xSubject = 종류별Subject.create<Int>()
    xSubject.subscribe { println("첫번째 $it") }
    xSubject.onNext(1)
    Thread.sleep(1000L)
    xSubject.subscribe { println("----두번째 $it") }
    xSubject.onNext(2)
    xSubject.onNext(3)
    Thread.sleep(1000L)
    xSubject.subscribe { println("********세번째 $it") }
    xSubject.onNext(4)
    xSubject.onComplete()

PublishSubject

  • 구독한 시점부터 새로운 데이터를 가져오는 subject
    ps

BehaviorSubject

  • 구독한 시점 직전 데이터부터 가져오는 subject
    bs
  • PublishSubject와 달리 직전 데이터도 가져온다. (두번째 2, 세번째 3)

ReplaySubject

  • 지금까지 발행된 데이터 모두들 가져오는 subject
    rs

AsyncSubject

  • complete되었을 때 가장 마지막 데이터를 받는 subject
    as

Disposable

  • 여러 이유로 작업을 취소해야할 수 있다. 이때 Disposable을 dispose()함으로 작업을 취소할 수 있다.
  • 여러 Disposable들을 일일히 dispose하면 귀찮으므로 CompositeDisposable을 사용한다.
  • 기본적으로 add로 disposable을 등록하고, clear로 등록된 작업을 취소한다.
    val compositeDisposable = CompositeDisposable()
    compositeDisposable.addAll( // 한개만 등록시 add
        Observable.just(1).subscribe(),
        Single.just(1).subscribe(),
        Maybe.just(1).subscribe()
    )
    compositeDisposable.clear() // 작업이 도중 취소된다.
    // dispose()도 있지만 그러면 CompositeDisposable을 더이상 못쓴다.

다음 시간에는 여러가지 연산자들을 알아보고, 가능하면 간단한 안드로이드 적용 예제를 살펴보겠다.

profile
Android Developer

0개의 댓글