🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다.
이전 포스팅 [Observable 개념 날먹하기] 과 이어집니다.
RxJava 에서는 다양한 형식으로 Observable 을 생성할 수 있다. 기존의 데이터를 Observable 형태로 만들 수도 있고, 네트워킹 작업 자체를 Observable 로 만들 수도 있다.
오늘은 Observable 을 생성하는 다양한 방법에 대해 알아보자. 사실 방법이 매우 많기 때문에, 그 중 자주 쓰이는 것들을 살펴본다.
Observable.create()
를 통해 Emitter 를 이용하여 직접 어떤 데이터를 순차적으로 발행할 수 있고, 데이터 발행 완료 및 오류 이벤트를 직접 설정해줄 수 있다.
fun main() {
val stream: Observable<String> =
Observable.create { // it: ObservableEmitter<String>
it.onNext("H43RO")
it.onError(Throwable())
it.onNext("Velog")
}
}
create() 호출 시 Lambda 를 활용하게 되면, it
키워드로 Emitter
를 사용할 수 있다. 이 Emitter 녀석을 통해 다양한 이벤트를 발생한다.
이전 시간에 다뤘었던 Emitter
인터페이스를 다시 복습해보자.
onNext()
onNext()
로 발행됨onComplete()
onComplete()
가 호출된 이후에는 더이상 onNext()
호출이 안됨onError()
onError()
가 호출된 이후에는 더이상 onNext()
호출이 안됨⛔️ 알아둬야 할 포인트
- Observable 에서 데이터, 오류 등을 발행할 때
null
발행은 안 된다.- 항상
onComplete()
혹은onError()
둘 중 하나로만 데이터 발행이 종료되어야 함
따라서 위 예제 같은 경우는 "H43RO" 와 "Velog" 라는 데이터를 순차적으로 발행하고, onComplete()
를 통해 데이터 발행이 끝났음을 알리는 것이다.
데이터 발행이 모두 끝났다면,
onComplete()
를 꼭 호출해야 한다.
이러한 메소드들을 통해 이벤트를 발생하는데, 이 이벤트들을 수신하는 주체는 어떻게 정할까? 바로 subscribe()
라는 녀석을 통해 감시자 (및 소비자) 를 지정한다.
아래 예제에서는, 시스템 출력 메소드 자체를 Observer 로 둔다.
stream.subscribe(System.out::println)
따라서, 출력 결과는 다음과 같다. 순차적으로 "H43RO", "Velog" 가 발행됐다.
H43RO
Velog
만일 데이터 발행 시 오류가 발생한다면, 에러를 처리해줘야 한다.
val stream: Observable<String> =
Observable.create { // it: ObservableEmitter<String>
it.onNext("H43RO")
it.onError(Throwable())
it.onNext("Velog")
}
}
stream.subscribe(System.out::println) { // it: Throwable!
println("Error!") // 에러 발생 시 처리하는 구문
}
따라서 출력 결과는 이렇게 된다.
H43RO
Error!
이렇듯 create()
를 통해 Observable
객체를 직접 생성하면, 데이터들을 순차적으로 직접 발행하고 데이터 발행 완료 처리하는 것까지 모두 개발자의 몫이기 때문에 주의해서 사용해야 한다. Observable
을 더 이상 사용하지 않을 때 등록해두었던 콜백 메소드들을 다 해제하지 않으면 메모리 릭이 발생하기 때문이다.
데이터를 있는 그대로 발행하는 Observable
을 생성한다.
fun main() {
val stream: Observable<String> =
Observable.just("H43RO", "Velog", "Kotlin")
stream.subscribe(System.out::println)
}
타입이 같은 여러 데이터를 넣어둘 수 있다.
Emitter 설명 시 언급했듯이, RxJava 는
null
을 허용하지 않기 때문에just()
의 데이터에도null
을 담아선 안된다.
RxKotlin 에서만 지원한다. (RxJava 기준 fromArray, fromIterable 등)
메소드명에서 알 수 있듯, 기존 Iterable 한 녀석들을 Observable
로 바꿔준다.
fun main() {
val data = arrayOf(100, 200, 300)
val stream = data.toObservable()
stream.subscribe(System.out::println)
}
출력 결과는 다음과 같다.
100
200
300
toObservable()
은 기존 RxJava 의 fromArray()
, fromIterable()
등 파편화된 메소드들을 RxKotlin 에서는 코틀린 답게 더욱 편리하게 사용할 수 있도록 만들어진 것이다.
fun main() {
val data = listOf("H43RO", "Velog", "Kotlin")
val stream = data.toObservable()
stream.subscribe(System.out::println)
}
H43RO
Velog
Kotlin
배열 또는
iterable<E>
인터페이스로 구현되는 모든 객체들은toObservable()
로 손쉽게Observable
을 생성할 수 있고, 내부 구현은fromArray()
,fromIterable()
등으로 이루어져있다.toObservable()
이 타입을 알아서 지정해주는 것이다.
Future
인터페이스는 자바 5 에 추가된 API 로, 연산 결과를 얻을 때 사용한다.
fromFuture()
메소드는 Future
인터페이스를 지원하는 모든 객체를 Observable Source
로 변환하고, Future.get()
메소드를 통해 호출한 값을 반환해준다.
보통
Executors
를 이용하여, 비동기 연산 및 작업 후 결과를 얻을 때 많이 사용된다. Emitter 는 Observable 내부에서Future.get()
을 호출하고, Future 의 작업이 모두 끝날 때 까지 Thread 는 블로킹된다.
fun main() {
val future =
Executors.newSingleThreadExecutor().submit<String> {
Thread.sleep(1000)
"Hello, Velog!"
}
println("Hello, H43RO!")
val source = Observable.fromFuture(future)
source.subscribe(System.out::println)
println("Wait, I was blocked!") // 블로킹됨
}
Hello, H43RO!
Hello, Velog!
Wait, I was blocked!
Reactive X 에서는 Executor 를 직접 다루기보다는, Reactive X 에서 제공하는 스케줄러를 사용하는 것을 적극 권장한다.
Publisher
는 Java 9 표준 API 로, 잠재적으로 데이터 발행을 제공하는 생산자이다! Subscriber 로부터 요청을 받아 데이터를 발행하게 된다. fromPublisher()
연산자는 Publisher
를 Observable
로 변환해준다.
fun main() {
val publisher: Publisher<String> =
Publisher<String> { // it: Subscriber<in String!>!
it.onNext("Hello")
it.onNext("My name is")
it.onNext("H43RO")
it.onComplete()
}
val source: Observable<String> = Observable.fromPublisher(publisher)
source.subscribe(System.out::println)
}
Callable
인터페이스는 Runnable
인터페이스처럼 메소드가 하나고 인자가 없다는 점에서 비슷비슷하지만, 실행결과를 '반환'한다는 점이 Runnable
과 조금 다르다. 그리고 위에서 등장했던 Executor
인터페이스의 인자로 활용되기 때문에, 다른 쓰레드에서 실행되는 것을 암시한다.
fromCallable()
메소드를 통해 Callable
을 Observable
로 변환하여 비동기적으로 아이템을 발행할 수 있다.
fun main() {
val callable: Callable<String> = Callable<String> { "H43RO" }
val source: Observable<String> = Observable.fromCallable(callable)
source.subscribe(System.out::println)
}
이번 포스팅에는 Observable 을 생성하는 다양한 방법들에 대하여 알아보았다. 다음 포스팅에서는 특별한 형태의 Observable 들에 대하여 알아보겠다.