[RxJava] Observable 생성하기

H43RO·2021년 8월 24일
4

Reactive Programming

목록 보기
3/16
post-thumbnail

🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다.
이전 포스팅 [Observable 개념 날먹하기] 과 이어집니다.

RxJava 에서는 다양한 형식으로 Observable 을 생성할 수 있다. 기존의 데이터를 Observable 형태로 만들 수도 있고, 네트워킹 작업 자체를 Observable 로 만들 수도 있다.

오늘은 Observable 을 생성하는 다양한 방법에 대해 알아보자. 사실 방법이 매우 많기 때문에, 그 중 자주 쓰이는 것들을 살펴본다.


create() 메소드

Observable.create() 를 통해 Emitter 를 이용하여 직접 어떤 데이터를 순차적으로 발행할 수 있고, 데이터 발행 완료 및 오류 이벤트를 직접 설정해줄 수 있다.

fun main() {
   val stream: Observable<String> =
       Observable.create {  // it: ObservableEmitter<String>
            it.onNext("H43RO")
	    it.onError(Throwable())
	    it.onNext("Velog")
	}
} 

create() 호출 시 Lambda 를 활용하게 되면, it 키워드로 Emitter 를 사용할 수 있다. 이 Emitter 녀석을 통해 다양한 이벤트를 발생한다.

이전 시간에 다뤘었던 Emitter 인터페이스를 다시 복습해보자.

onNext()

  • 데이터하나 발행됐음을 알리는 이벤트
  • 연속된 데이터의 경우, 데이터가 하나씩 차례대로 onNext() 로 발행됨

onComplete()

  • 데이터 발행이 모두 완료되었음을 알리는 이벤트
  • onComplete() 가 호출된 이후에는 더이상 onNext() 호출이 안됨

onError()

  • 오류가 발생했음을 알리는 이벤트
  • 마찬가지로 onError() 가 호출된 이후에는 더이상 onNext() 호출이 안됨

⛔️ 알아둬야 할 포인트

  • Observable 에서 데이터, 오류 등을 발행할 때 null 발행은 안 된다.
  • 항상 onComplete() 혹은 onError() 둘 중 하나로만 데이터 발행이 종료되어야 함

따라서 위 예제 같은 경우는 "H43RO" 와 "Velog" 라는 데이터를 순차적으로 발행하고, onComplete() 를 통해 데이터 발행이 끝났음을 알리는 것이다.

데이터 발행이 모두 끝났다면, onComplete()꼭 호출해야 한다.

이러한 메소드들을 통해 이벤트를 발생하는데, 이 이벤트들을 수신하는 주체는 어떻게 정할까? 바로 subscribe() 라는 녀석을 통해 감시자 (및 소비자) 를 지정한다.

아래 예제에서는, 시스템 출력 메소드 자체를 Observer 로 둔다.

stream.subscribe(System.out::println)

따라서, 출력 결과는 다음과 같다. 순차적으로 "H43RO", "Velog" 가 발행됐다.

H43RO
Velog

만일 데이터 발행 시 오류가 발생한다면, 에러를 처리해줘야 한다.

val stream: Observable<String> =
    Observable.create {  // it: ObservableEmitter<String>
        it.onNext("H43RO")
	it.onError(Throwable())
	it.onNext("Velog")
    }
}

stream.subscribe(System.out::println) {  // it: Throwable!
    println("Error!")  // 에러 발생 시 처리하는 구문
}

따라서 출력 결과는 이렇게 된다.

H43RO
Error!

이렇듯 create() 를 통해 Observable 객체를 직접 생성하면, 데이터들을 순차적으로 직접 발행하고 데이터 발행 완료 처리하는 것까지 모두 개발자의 몫이기 때문에 주의해서 사용해야 한다. Observable 을 더 이상 사용하지 않을 때 등록해두었던 콜백 메소드들을 다 해제하지 않으면 메모리 릭이 발생하기 때문이다.


just() 메소드

데이터를 있는 그대로 발행하는 Observable 을 생성한다.

fun main() {
    val stream: Observable<String> =
        Observable.just("H43RO", "Velog", "Kotlin")
    
    stream.subscribe(System.out::println)
}

타입이 같은 여러 데이터를 넣어둘 수 있다.

Emitter 설명 시 언급했듯이, RxJava 는 null 을 허용하지 않기 때문에 just() 의 데이터에도 null 을 담아선 안된다.


toObservable() 메소드

RxKotlin 에서만 지원한다. (RxJava 기준 fromArray, fromIterable 등)

메소드명에서 알 수 있듯, 기존 Iterable 한 녀석들을 Observable 로 바꿔준다.

fun main() {
    val data = arrayOf(100, 200, 300)
    val stream = data.toObservable()

    stream.subscribe(System.out::println)
}

출력 결과는 다음과 같다.

100
200
300

toObservable() 은 기존 RxJava 의 fromArray(), fromIterable() 등 파편화된 메소드들을 RxKotlin 에서는 코틀린 답게 더욱 편리하게 사용할 수 있도록 만들어진 것이다.

fun main() {
    val data = listOf("H43RO", "Velog", "Kotlin")
    val stream = data.toObservable()

    stream.subscribe(System.out::println)
}
H43RO
Velog
Kotlin

배열 또는 iterable<E> 인터페이스로 구현되는 모든 객체들은 toObservable() 로 손쉽게 Observable 을 생성할 수 있고, 내부 구현은 fromArray(), fromIterable() 등으로 이루어져있다. toObservable()타입을 알아서 지정해주는 것이다.


fromFuture() 메소드

Future 인터페이스는 자바 5 에 추가된 API 로, 연산 결과를 얻을 때 사용한다.

fromFuture() 메소드는 Future 인터페이스를 지원하는 모든 객체를 Observable Source 로 변환하고, Future.get() 메소드를 통해 호출한 값반환해준다.

보통 Executors 를 이용하여, 비동기 연산 및 작업 후 결과를 얻을 때 많이 사용된다. Emitter 는 Observable 내부에서 Future.get() 을 호출하고, Future 의 작업이 모두 끝날 때 까지 Thread 는 블로킹된다.

fun main() {
    val future =
        Executors.newSingleThreadExecutor().submit<String> {
            Thread.sleep(1000)
            "Hello, Velog!"
        }

    println("Hello, H43RO!")
    val source = Observable.fromFuture(future)
    source.subscribe(System.out::println)
    println("Wait, I was blocked!")  // 블로킹됨
}
Hello, H43RO!
Hello, Velog!
Wait, I was blocked!

Reactive X 에서는 Executor 를 직접 다루기보다는, Reactive X 에서 제공하는 스케줄러를 사용하는 것을 적극 권장한다.


fromPublisher() 메소드

Publisher 는 Java 9 표준 API 로, 잠재적으로 데이터 발행을 제공하는 생산자이다! Subscriber 로부터 요청을 받아 데이터를 발행하게 된다. fromPublisher() 연산자는 PublisherObservable 로 변환해준다.

fun main() {
    val publisher: Publisher<String> =
        Publisher<String> {  // it: Subscriber<in String!>!
            it.onNext("Hello")
            it.onNext("My name is")
            it.onNext("H43RO")
            it.onComplete()
        }
    val source: Observable<String> = Observable.fromPublisher(publisher)
    source.subscribe(System.out::println)
}

fromCallable() 메소드

Callable 인터페이스는 Runnable 인터페이스처럼 메소드가 하나고 인자가 없다는 점에서 비슷비슷하지만, 실행결과를 '반환'한다는 점이 Runnable 과 조금 다르다. 그리고 위에서 등장했던 Executor 인터페이스의 인자로 활용되기 때문에, 다른 쓰레드에서 실행되는 것을 암시한다.

fromCallable() 메소드를 통해 CallableObservable 로 변환하여 비동기적으로 아이템을 발행할 수 있다.

fun main() {
    val callable: Callable<String> = Callable<String> { "H43RO" }
    val source: Observable<String> = Observable.fromCallable(callable)
    source.subscribe(System.out::println)
}

이번 포스팅에는 Observable 을 생성하는 다양한 방법들에 대하여 알아보았다. 다음 포스팅에서는 특별한 형태의 Observable 들에 대하여 알아보겠다.

profile
어려울수록 기본에 미치고 열광하라

0개의 댓글