RxJava에서는 Observable을 구독하는 Observer가 존재하고, Observable이 순차적으로 발행하는 데이터에 대해서 반응합니다. ReactiveX 에서는 Observable은 Emitter 인터페이스에 포함되어 있는 3가지 이벤트를 활용하여 Observer에게 무언가 알릴 수 있습니다.
- onNext() : 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행합니다.
- onComplete() : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext()를 더 호출하지 않음을 나타냅니다.
- onError() : 오류가 발생했음을 Observer에 전달합니다.
Rx에서는 연산자(Operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있습니다. 이 중 자주 쓰이는 연산자를 살펴보겠습니다.
그리고 데이터나 오류 내용을 발행할 때 null은 발행할 수 없습니다.
fun main() {
Observable.create<String> { emitter ->
emitter.onNext("Hello")
emitter.onNext("World")
emitter.onComplete()
}
.subscribe({ value ->
println(value)
}, { error ->
println("Error: ${error.message}")
}, {
println("Completed")
})
}
결과
Hello
World
Completed
위 코드는 "Hello"와 "World"를 출력하고, 마지막으로 "Completed"를 출력합니다.
먼저 Observable.create 메서드를 호출하여 Observable을 생성합니다. 이때, 데이터 타입으로 String을 사용하도록 지정했습니다. 그리고 onNext 메서드를 호출하여 데이터를 전달합니다. 마지막으로 onComplete 메서드를 호출하여 이벤트 전달을 마무리합니다.
이후 subscribe 메서드를 호출하여 Observer를 등록합니다. 이때, onNext 콜백 함수와 onError 콜백 함수, onComplete 콜백 함수를 차례로 전달합니다. 이렇게 등록된 Observer는 Observable이 발생시키는 이벤트를 처리합니다.
fun main() {
val source: Observable<String> =
Observable.create { // it: ObservableEmitter<String>
it.onNext("Hello")
it.onComplete();
it.onNext("World");
}
}
결과
Hello
onComplete 이후에는 아이템이 더 발행되더라도 구독자는 데이터를 받지 못합니다. 위에서 본 예제에서 두 번째 onNext와 onComplete의 위치를 바꾸면 두 번째 데이터인 “World”는 받을 수 없습니다.
val source: Observable<String> =
Observable.create {
it.onNext("Hello")
it.onError(Throwable())
it.onNext("World")
}
}
source.subscribe(System.out::println) { // it: Throwable
println("Error!") // 에러 발생 시 처리하는 구문
}
https://reactivex.io/documentation/operators
https://blog.yena.io/studynote/2020/10/23/Android-RxJava(2).html