데이터의흐름과 변경사항의 전파에 중점을둔 선언적프로그래밍 패러다임
(모든것을 스트림으로 간주한고 선언적으로 개발하는것)
선언적 프로그래밍 : ‘어떻게’가아닌 ‘무엇을 나타내는지’에 초점을 준
변경사항의 전파 : 어떤데이터를보여줄지 가져와서 보여주는것이아니라 값이 변화할때 마다 데이터를 전달
관찰 가능한 스트림(Observable streams)을 사용하는 비동기 프로그래밍 API이다.
Publisher(데이터를 만들어 통지하는 생산자)와 Subscriber로 구성되어있으며, Subscriber가 Publisher를 구독하면 Publisher가 통보하는 데이터를 받을 수 있다.
RxJava에서는 Observable을 구독하는 Observer가 존재한다.
Observable은 데이터 흐름에 맞게 알림을 보내 Observer가 데이터를 사용할 수 있도록 한다.
Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받는다.
Observer는 수신한 데이터를 가지고 어떠한 일을 한다.
Observable이 데이터를 발행하고 알림(Event)를 보내면 Observer는 Observable을 구독(Subscribe)해 데이터를 소비(Consume)한다.
Observable이 데이터를 발행(emit)한 후 Emitter라는 인터페이스에 포함된 3가지 알림을 사용하여 동작한다.
* Observable은 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.
Observer는 subscribe()메소드에서 수신한 각각의 알림에 대해 실행할 내용을 지정한다.
Observable을 생성할 때에는 직접 인스턴스를 만들지 않고 정적 팩토리 함수(생성연산자)를 호출한다.
Integer [] array = {1, 2, 3, 4, 5}
Observable.fromArray(array)
.subscribe(System.out::println)
3-2. fromCallable()fun main() {
val callable: Callable<String> = Callable<String> { "Hello" }
val source: Observable<String> = Observable.fromCallable(callable)
source.subscribe(System.out::println)
}
3-3. fromFuture()Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
Thread.sleep(1000)
return "Hello Future"
})
source: Observable<String> = Observable.fromFuture(future)
source.subscribe { it -> println(it) }
등등 다양한 생성자 들이 있다.
Observable 은 Cold Observable 과 Hot Observable 로 나눌 수 있다.
Cold Observable
Cold Observable은 Observable을 생성하고 Observer가 subscribe를 호출할 때까지 데이터 발행을 하지 않는다.
임의로 종료시키지 않는 이상 여러 번 요청에도 각각의 observer(관찰자)마다 다른 observable stream이 생성되기 때문에 처음부터 끝까지 발행하는 것을 보장한다.
데이터베이스 쿼리, 파일 읽기, API 요청등에 사용된다.
Hot Observable
Hot Observable은 Observer의 존재 여부와 관계없이 데이터를 발행 해 모든 구독자에게 동시에 같은 데이터를 발행한다.
Observer는 구독한 시점부터 발행된 데이터를 받기 때문에 데이터 전부를 받는 것을 보장하지 못한다.
가장 최근의 데이터를 처리하는 마우스 이벤트, 키보드 이벤트, 센서 데이터 등이 있다.
create() 를 사용할 때 Emitter 를 사용하여 데이터를 발행한다.
데이터 하나가 발행과 동시에 종료되기 때문에 onSuccess() 라는 이벤트만으로 데이터 발행이 완료됨을 알린다. (물론 오류 처리의 경우 동일하게 onError() 를 사용한다.
Single
.just(1)
.subscribe { it -> println(it) }
//
fun main() {
Single.create<String> { // it: SingleEmitter<String!>
it.onSuccess("Hello")
}.subscribe { it ->
println(it)
}
}
Single과 비슷하지만, 아이템을 발행하거나 발행하지 않을 수도 있다는 점에서 차이가 있다. Single 클래스에 onComplete 이벤트가 추가된 형태이다.
아이템을 발행했을 때에는 onSuccess()를 호출하고, 발행하지 않을 때에는 onComplete()를 호출한다.
값이 없고 완료 또는 에러 신호만 있는 지연된 계산을 나타내기 때문에 onNext(), onSuccess()은 사용하지 않고 onComplete() 와 onError()만 사용한다.
데이터 스트림에 쌓이는 데이터의 양을 제어할 수 있는 데이터 스트림으로 Backpressure 현상을 제어할 수 있다.
대량 데이터(예를 들어 10,000건 이상의 네트워크 통신이나 DB 등의 I/O)를 처리를 할 때 사용한다.
*) Backpressure (배압) 이란?
데이터의 소비 속도가 데이터 발행 속도를 못 따라갈때 발생하는 현상이다.
관찰 가능한 데이터 스트림과 관찰자(구독자)의 성격을 모두 갖고 있다. Observable 과 Observer 를 모두 구현한 추상 타입으로, 하나의 소스로부터 다중의 구독자에게 멀티 캐스팅이 가능하다.
Observer 를 구현한다는 특징때문에, onNext(), onError(), onComplete() 등의 이벤트를 수동으로 발생하여 구독자들에게 전달해줄 수 있다.
onComplete() 가 발생하면 가장 마지막에 발행된 데이터를 전달한다.
구독 후 가장 최근 값 혹은 발행된 데이터가 없다면 기본값(pink)을 넘겨주는 클래스이다.
Hot Observable 특성을 갖고 있어 subscribe() 함수를 호출하면 값을 발행하기 시작한다.
데이터가 모두 발행되고 난 뒤 구독을 하면 아무 데이터도 받아볼 수 없게 된다.
항상 데이터의 처음부터 끝까지 발행하는 것을 보장해준다. 구독자가 생기면 이전에 발행했던 데이터들을 모두 해당 구독자에게 전달한다.
구독전까지 발행된 데이터들을 계속 버퍼에 저장해두었다가 구독을 시작할 때 버퍼의 데이터들을 모두 발행하고 버퍼를 비운다. 만약 두 개 이상 구독자가들어올 시 IllegalStateException을 발생시킨다.
Disposable
Disposable 인터페이스는 dispose()와 isDisposed() 두개의 메소드로 정의되어있다.
Observable을 팩토리 함수로 데이터의 흐름을 정의 한 후 subscribe()함수를 호출해 아이템을 발행한다.
subscribe()를 호출한 후에는 Disposable 인터페이스의 객체가 반환된다.
onComplete()가 호출되면 데이터 발행이 종료되지만, 아이템을 무한하게 발행하거나 오래 실행되는 Observable의 경우, 제대로 종료하지 않으면 메모리 누수가 발생할 수 있기때문에 Observable의 구독이 필요없을 때는 Disposable.dispose()를 호출해서 아이템의 발행을 중단하는 것이 효율적이다.
제대로 폐기되었는지 확인하려면 Disposable.isDisposed()를 통해 체크할 수 있다. (onComplete()가 호출된 이후라면 dispose()를 호출할 필요가 없다.)
CompositeDisposable
Disposable객체들을 한 번에 관리할 수 있는 객체이다.
CompositeDisposable 에 Disposable 객체들을 추가해두고 필요에 따라 한 번에 처리한다.
RxJava의 연산자는 특정 input에 대해 항상 동일한 output을 반환하는 부수효과가 없는 순수함수로, 비동기 프로그램에 필요한 주요 로직을 만들 수 있다.
https://reactivex.io/documentation/operators.html
스케줄러를 사용해 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다.
subscribeOn, observeOn
subscribeOn : Observable이 데이터 흐름을 발생시키고 연산하는 스레드를 지정할 수 있다. 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관없다.
observeOn : 발행되는 데이터를 구독하는 쓰레드를 지정할 수 있다.
여러번 호출될 수 있으며, Observable 데이터 스트림에서 발행한 데이터를 가로채서 지정한 스케줄러에서 이를 구독하기 때문에 위치가 중요하다.
*)subscribeOn이 체이닝되어 있는데 observeOn이 체이닝되지 않은 경우
구독하는 쓰레드도 동일한 쓰레드에서 동작 한다.
**)체이닝 기법 : 선택한 요소에 메서드를 연속적으로 사용하는 것
Android에서는 네트워크 통신 및 DB쿼리 결과 데이터를 subscribeOn()로 IO Thread Scheduler를 지정해 발행하고, observeOn을 통해 AndroidSchedulers.mainThread()를 지정해 비동기 동작의 결과물을 메인 쓰레드(UI 쓰레드) 에서 UI 를 갱신하는 형태로 사용된다.