RxJava2

Jini.Dev·2022년 6월 27일
0

반응형 프로그래밍?

데이터의흐름변경사항의 전파에 중점을둔 선언적프로그래밍 패러다임
(모든것을 스트림으로 간주한고 선언적으로 개발하는것)
선언적 프로그래밍 : ‘어떻게’가아닌 ‘무엇을 나타내는지’에 초점을 준
변경사항의 전파 : 어떤데이터를보여줄지 가져와서 보여주는것이아니라 값이 변화할때 마다 데이터를 전달


RxJava2

관찰 가능한 스트림(Observable streams)을 사용하는 비동기 프로그래밍 API이다.

Publisher(데이터를 만들어 통지하는 생산자)와 Subscriber로 구성되어있으며, Subscriber가 Publisher를 구독하면 Publisher가 통보하는 데이터를 받을 수 있다.

  • Observable : 관찰할 수 있는' 데이터 스트림이다.
  • Operators (연산자) : 비동기 흐름을 조합하는 실행 단위로 옵저버블을 처리하는 함수이다. 옵저버블로부터 전달받은 데이터를 가공하여 최종적인 결과 데이터를 만들어낸다.
  • Scheduler : 스레드 관리자
  • Observer(구독자) : Subscriber, Watcher, Reactor등으로도 불리며, 옵저버블이 발행한 데이터를 구독하는 구독자.
    구독하지 않으면 옵저버블이 발행한 데이터를 전달받을 수 없다.

Observable

RxJava에서는 Observable을 구독하는 Observer가 존재한다.
Observable은 데이터 흐름에 맞게 알림을 보내 Observer가 데이터를 사용할 수 있도록 한다.

Observable의 특징

  • Composable
    Observable은 비동기 데이터의 적절한 흐름과 순서를 구성할 수 있다.
  • Flexible
    Observable은 단순한 scalar 값 뿐만 아니라 무한한 스트림을 차례로 발행할 수 있다.
  • Less Opinionated
    Observable은 데이터 소스가 Concurrency이던 Asynchronicity이던 상관없다. Observable은 Thread-pool, Event Loop, Non-Blocking I/O 등 다양한 방식을 통해 구현될 수 있다.

Observable 동작 순서

Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받는다.
Observer는 수신한 데이터를 가지고 어떠한 일을 한다.
Observable이 데이터를 발행하고 알림(Event)를 보내면 Observer는 Observable을 구독(Subscribe)해 데이터를 소비(Consume)한다.

Observable Event

Observable이 데이터를 발행(emit)한 후 Emitter라는 인터페이스에 포함된 3가지 알림을 사용하여 동작한다.

  • OnNext() : 데이터가 하나 발행됐음을 알리는 이벤트
    연속된 데이터의 경우, 데이터가 하나씩 차례대로 onNext() 로 발행된다.
  • onComplete() : 단 한번만 발생하며, 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 OnNext()를 더 호출하지 않음을 나타낸다.
  • onError() : Observable에서 오류가 발생했음을 알리는 이벤트. null은 발행하지 못한다. onError 이벤트가 발생하면 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다.
    ✽✽ Observable은 항상 onComplete() 혹은 onError() 둘 중 하나로만 데이터 발행이 종료되어야 한다.

Observable Subscribe

Observable은 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.
Observer는 subscribe()메소드에서 수신한 각각의 알림에 대해 실행할 내용을 지정한다.

Observable 생성

Observable을 생성할 때에는 직접 인스턴스를 만들지 않고 정적 팩토리 함수(생성연산자)를 호출한다.

  1. just()
    인자로 넣은 데이터를 있는 그대로 차례로 발행한다. (데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생)
    한 개의 값을 넣을 수도 있고 인자로 여러 개의 값(동일한 타입, 최대 10개)을 넣을 수도 있다.
    데이터를 다르게 변경하고 싶으면 map()과 같은 연산자를 통해 변환해야한다.
  2. create()
    Emitter에 포함되어 있는 onNext(), onComplete(), onError() 같은 알림을 개발자가 직접 호출해야 한다.
    ✽✽ create()사용시 주의점
    • Observable이 dispose될 대 콜백 해지(memory leak)
    • 구독하는 동안에만 onNext, onComplete 호출
    • 에러는 오직 onError에서만
    • BackPressure는 직접 처리
  3. fromXXX()
    특정 타입의 데이터를 Observable로 바꿔주는 메소드이다
    3-1. fromArray()
    배열에 들어있는 데이터를 처리할때사용한다.
    Integer [] array = {1, 2, 3, 4, 5}
    Observable.fromArray(array)
    .subscribe(System.out::println)
    3-2. fromCallable()
    Callable 을 Observable 로 변환하여 비동기적으로 아이템을 발행할 수 있다.
    fun main() {
        val callable: Callable<String> = Callable<String> { "Hello" }
        val source: Observable<String> = Observable.fromCallable(callable)
        source.subscribe(System.out::println)
    }
    3-3. fromFuture()
    Future 객체에서 fromFuture() 함수를 사용해 Observable를 생성
    Future 인터페이스를 지원하는 모든 객체를 Observable Source 로 변환하고, Future.get() 메소드를 통해 호출한 값을 반환
    Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
        Thread.sleep(1000)
    		return "Hello Future" 
    })
    
    source: Observable<String> = Observable.fromFuture(future)
    source.subscribe { it -> println(it) }
  4. Interval()
    시간 간격을 두고 데이터를 방출하는 Observable 생성

등등 다양한 생성자 들이 있다.

Observable종류

Observable 은 Cold Observable 과 Hot Observable 로 나눌 수 있다.

  1. Cold Observable
    Cold Observable은 Observable을 생성하고 Observer가 subscribe를 호출할 때까지 데이터 발행을 하지 않는다.
    임의로 종료시키지 않는 이상 여러 번 요청에도 각각의 observer(관찰자)마다 다른 observable stream이 생성되기 때문에 처음부터 끝까지 발행하는 것을 보장한다.
    데이터베이스 쿼리, 파일 읽기, API 요청등에 사용된다.

  2. Hot Observable
    Hot Observable은 Observer의 존재 여부와 관계없이 데이터를 발행 해 모든 구독자에게 동시에 같은 데이터를 발행한다.
    Observer는 구독한 시점부터 발행된 데이터를 받기 때문에 데이터 전부를 받는 것을 보장하지 못한다.
    가장 최근의 데이터를 처리하는 마우스 이벤트, 키보드 이벤트, 센서 데이터 등이 있다.

Observable클래스와 파생클래스

  • Observable : 가장 기본적인 형태이다. 0개~N개의 데이터 발행, BackPressure 없다.
  • Single : 단 1개의 데이터, 혹은 오류를 발행한다.
  • Maybe : 0개 또는 1개 완료, 오류를 발행한다.
  • Completable : 성공 혹은 실패했다는 결과만 발행한다.
  • Flowable : 0개~N개의 데이터 발행하며, BackPressure 를 제어할 수 있다.
  • Subject : 관찰 가능한 데이터 스트림과 관찰자(구독자)의 성격을 모두 갖고 있다.
    Cold Observable을 Hot Observable으로 변환한다.
  • ConnectableObservable : Cold Observable을 Hot Observable으로 변환한다.

1. Single

create() 를 사용할 때 Emitter 를 사용하여 데이터를 발행한다.
데이터 하나가 발행과 동시에 종료되기 때문에 onSuccess() 라는 이벤트만으로 데이터 발행이 완료됨을 알린다. (물론 오류 처리의 경우 동일하게 onError() 를 사용한다.

Single
.just(1)
.subscribe { it -> println(it) }

//
fun main() {
    Single.create<String> {  // it: SingleEmitter<String!>
        it.onSuccess("Hello")
    }.subscribe { it ->
        println(it)
    }
}

1. Maybe

Single과 비슷하지만, 아이템을 발행하거나 발행하지 않을 수도 있다는 점에서 차이가 있다. Single 클래스에 onComplete 이벤트가 추가된 형태이다.
아이템을 발행했을 때에는 onSuccess()를 호출하고, 발행하지 않을 때에는 onComplete()를 호출한다.

2. Completable

값이 없고 완료 또는 에러 신호만 있는 지연된 계산을 나타내기 때문에 onNext(), onSuccess()은 사용하지 않고 onComplete() 와 onError()만 사용한다.

3. Flowable

데이터 스트림에 쌓이는 데이터의 양을 제어할 수 있는 데이터 스트림으로 Backpressure 현상을 제어할 수 있다.
대량 데이터(예를 들어 10,000건 이상의 네트워크 통신이나 DB 등의 I/O)를 처리를 할 때 사용한다.
*) Backpressure (배압) 이란?
데이터의 소비 속도가 데이터 발행 속도를 못 따라갈때 발생하는 현상이다.

4. Subject

관찰 가능한 데이터 스트림과 관찰자(구독자)의 성격을 모두 갖고 있다. Observable 과 Observer 를 모두 구현한 추상 타입으로, 하나의 소스로부터 다중의 구독자에게 멀티 캐스팅이 가능하다.
Observer 를 구현한다는 특징때문에, onNext(), onError(), onComplete() 등의 이벤트를 수동으로 발생하여 구독자들에게 전달해줄 수 있다.

4-1. AsyncSubject

onComplete() 가 발생하면 가장 마지막에 발행된 데이터를 전달한다.

4-2. BehaviorSubject

구독 후 가장 최근 값 혹은 발행된 데이터가 없다면 기본값(pink)을 넘겨주는 클래스이다.

4-3. PublishSubject

Hot Observable 특성을 갖고 있어 subscribe() 함수를 호출하면 값을 발행하기 시작한다.
데이터가 모두 발행되고 난 뒤 구독을 하면 아무 데이터도 받아볼 수 없게 된다.

4-4. ReplaySubject

항상 데이터의 처음부터 끝까지 발행하는 것을 보장해준다. 구독자가 생기면 이전에 발행했던 데이터들을 모두 해당 구독자에게 전달한다.

4-5. UnicastSubject

구독전까지 발행된 데이터들을 계속 버퍼에 저장해두었다가 구독을 시작할 때 버퍼의 데이터들을 모두 발행하고 버퍼를 비운다. 만약 두 개 이상 구독자가들어올 시 IllegalStateException을 발생시킨다.

Disposable, CompositeDisposable

  1. Disposable

    Disposable 인터페이스는 dispose()와 isDisposed() 두개의 메소드로 정의되어있다.
    Observable을 팩토리 함수로 데이터의 흐름을 정의 한 후 subscribe()함수를 호출해 아이템을 발행한다.
    subscribe()를 호출한 후에는 Disposable 인터페이스의 객체가 반환된다.

    onComplete()가 호출되면 데이터 발행이 종료되지만, 아이템을 무한하게 발행하거나 오래 실행되는 Observable의 경우, 제대로 종료하지 않으면 메모리 누수가 발생할 수 있기때문에 Observable의 구독이 필요없을 때는 Disposable.dispose()를 호출해서 아이템의 발행을 중단하는 것이 효율적이다.
    제대로 폐기되었는지 확인하려면 Disposable.isDisposed()를 통해 체크할 수 있다. (onComplete()가 호출된 이후라면 dispose()를 호출할 필요가 없다.)

  2. CompositeDisposable

    Disposable객체들을 한 번에 관리할 수 있는 객체이다.
    CompositeDisposable 에 Disposable 객체들을 추가해두고 필요에 따라 한 번에 처리한다.

Operators

RxJava의 연산자는 특정 input에 대해 항상 동일한 output을 반환하는 부수효과가 없는 순수함수로, 비동기 프로그램에 필요한 주요 로직을 만들 수 있다.

https://reactivex.io/documentation/operators.html

연산자 분류

  1. 생성(Creating) 연산자
    Observable, Single 클래스 등으로 데이터의 흐름을 만들어내는 함수이다. 위 Observable생성함수를 포함해 defer(), range(), timer() 등 이 있다.
  2. 변환(Transforming) 연산자
    map(), flatMap(), buffer() 등 어떤 입력을 받아 원하는 출력의 결과를 나타내는 함수이다.
  3. 필터(Filtering) 연산자
    filter(), first(), take()등 입력데이터 중 원하는 데이터만 걸러내는 함수이다.
  4. 합성(Combining) 연산자
    join(), merge(), zip()등 Observable을 조합하는 함수이다.
  5. 오류 처리(Error Handling) 연산자
    catch(), retry() 등이 있다.
  6. 관찰 가능한 유틸리티(Observable Utility) 연산자
    Observable 작업을 위한 함수로 Subscribe(), SubscribeOn(), ObserveOn() 등이 있다.
  7. 조건과 불린(Conditional and Boolean) 연산자
    Observable의 흐름을 제어하는 함수이다. all()등이 있다.
  8. 수학과 집계(Mathematical and Aggregate) 연산자
    수학 함수와 연관있는 연산자이다. sum(), max(), average()등이 있다.
  9. 배압(Backpressure) 연산자
    배압 이슈에 대응하는 함수로 onBackpressureBuffer(), onBackpressureDrop(), onBackpressureLatest()가 있다.

Scheduler

스케줄러를 사용해 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다.

Scheduler 종류

  1. Single Thread Scheduler (Schedulers.single())
    단일 스레드를 계속 재사용한다. RxJava 내부에서 스레드를 별도로 생성하며, 한 번 생성된 스레드로 여러 작업을 처리한다.
  2. Computation Thread Scheduler (Schedulers.computation())
    CPU에 대응하는 계산용 스케줄러로, IO 작업을 하지 않고 일반적인 계산/연산 작업을 할 때 사용한다. 내부적으로 스레드 풀을 생성하며 생성된 스레드 개수는 프로세서의 개수와 동일하다.
  3. IO Thread Scheduler (Schedulers.io()
    네트워크 요청 처리하거나 각종 I/O작업을 위한 스케줄러이다.
    필요할 때마다 스레드를 계속 생성하지만, 이전에 생성했던 쓰레드가 존재한다면 이를 재사용 한다.
  4. Trampoline Thread Scheduler (Schedulers.trampoline())
    새로운 스레드를 생성하지 않고, 현재 스레드에 무한한 크기의 대기행렬(Queue)를 생성하는 스케줄러 이다. 모든 작업을 들어온 순서대로 (순차적으로) 실행하는 것을 보장(FIFO)
  5. New Thread Scheduler (Schedulers.newThread())
    요청을 받을 때 마다 매번 새로운 스레드를 생성한다.
  6. Main Thread Scheduler (AndroidSchedulers.mainThread())
    RxAndroid 에만 포함되어 있고, 안드로이드 메인 쓰레드를 지정하는 스케줄러를 제공한다.

Scheduler를 활용하기위한 연산자

subscribeOn, observeOn

subscribeOn : Observable이 데이터 흐름을 발생시키고 연산하는 스레드를 지정할 수 있다. 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관없다.

observeOn : 발행되는 데이터를 구독하는 쓰레드를 지정할 수 있다.
여러번 호출될 수 있으며, Observable 데이터 스트림에서 발행한 데이터를 가로채서 지정한 스케줄러에서 이를 구독하기 때문에 위치가 중요하다.

*)subscribeOn이 체이닝되어 있는데 observeOn이 체이닝되지 않은 경우
구독하는 쓰레드도 동일한 쓰레드에서 동작 한다.
**)체이닝 기법 : 선택한 요소에 메서드를 연속적으로 사용하는 것

Android에서는 네트워크 통신 및 DB쿼리 결과 데이터를 subscribeOn()로 IO Thread Scheduler를 지정해 발행하고,  observeOn을 통해 AndroidSchedulers.mainThread()를 지정해 비동기 동작의 결과물을 메인 쓰레드(UI 쓰레드) 에서 UI 를 갱신하는 형태로 사용된다.

profile
정신 차려보니 개발자가 되어있었다.

0개의 댓글