[RxJava] Observable 생성하기 고급편

H43RO·2021년 9월 4일
2

Reactive Programming

목록 보기
7/16
post-thumbnail

🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.

이전 포스팅과 이어집니다.

또 있어...?

이전 포스팅에서 create()just() 와 같이 Observable 을 생성하는 다양한 방법들에 대해서 알아보았는데, 사실 빙산의 일각이었다. 아래 사진처럼 매우 다양하다.

종류가 너무 많다고 암담해하지 말자. 연산자가 다양한 만큼, 더욱 다이나믹하게, 상황에 따라 적절한 비동기 처리를 할 수 있다는 뜻이기도 하니까. 생성 연산자를 다양하게 알면 알 수록 더욱 Reactive 한 코드를 짤 수 있을 것이다!

따라서 오늘은 Observable 을 생성하는 방법 몇 가지를 더 알아보자.


defer() 메소드

'Defer' 의 사전적 정의는 '미루다, 연기하다' 이다. 그와 비슷하게, defer() 연산자는 누군가가 구독할 때까지 Observable 데이터 발행을 미루게 한다. 즉, 어떤 Observer 가 subscribe()를 호출할 때 데이터를 발행하기 시작하는 것이다.

아래 예제 코드를 통해 defer() 의 동작을 이해해보자.

fun getCurrentTime(): String {
    val timeFormat = SimpleDateFormat("mm:ss.SSS", Locale.KOREA)
    return timeFormat.format(System.currentTimeMillis())
}

fun main() {
    val justStream = Observable.just(getCurrentTime())
    val deferStream = Observable.defer {
        Observable.just(getCurrentTime())
    }

    // 5초 경과
    println("[1] : ${getCurrentTime()}")
    Thread.sleep(5000)
    println("[2] : ${getCurrentTime()}")

    println("=========================================")

    justStream.subscribe {
        println("just : $it")
    }

    deferStream.subscribe {
        println("defer : $it")
    }
}

데이터 발행 방식의 차이를 명확하게 이해하기 위해 현재 시각을 출력하는 동작과 함께 just() 와 비교해보도록 하자. 쓰레드를 5초동안 정지했고, just() 을 통해 생성된 Observable 이 발행하는 데이터와 defer() 를 통해 생성된 Observable 이 발행하는 데이터를 자세히 보자.

[1] : 15:44.640
[2] : 15:49.645
=========================================
just : 15:44.572
defer : 15:49.655

just() 를 통해 생성된 녀석은 Observable 이 생성됐을 때의 시간을 발행했고, defer() 를 통해 생성된 녀석은 구독을 시작했을 때의 시간을 발행했다. 차이가 이해되는가?

따라서 구독 당시 가장 최신 데이터를 받아보고 싶을 때 defer() 를 사용하면 된다.


timer() 메소드

지정된 시간만큼 기다렸다가 0L 이라는 데이터를 발행하는 녀석이다. 간단히 이해할 수 있다.

fun main() {
    val stream = Observable.timer(3, TimeUnit.SECONDS)

    println("옵저빙(구독) 시작!")
    stream.subscribe {
        println("[데이터 발행] item : $it, Class: ${it.javaClass}")
    }
    Thread.sleep(3000)
}
옵저빙(구독) 시작!
[데이터 발행] item : 0, Class: class java.lang.Long

empty(), never() 메소드

아무 데이터도 발행하지 않는 (???) 데이터 스트림을 생성한다. 굳이 왜 있는지 잘 모르겠을 수 있으나, 테스트 목적으로 사용하기 좋을 것이다. (아래 다이어그램조차 존재 의의를 궁금하게 할 정도로 기괴하다)

이 두 연산자의 차이점은 명확하다. 아래 코드를 살펴보자. 물론 아무런 데이터도 발행하지 않는다.

fun main() {
    Observable.empty<Any>()
        .doOnTerminate { println("empty() : 저는 먼저 들어가보겠습니다.. 총총") }
        .subscribe()

    Observable.never<Any>()
        .doOnTerminate { println("never() : 하 나는 또 야근이네") }
        .subscribe()
}

doOnTerminate() 는 Observable 이 종료될 때 실행할 동작을 정의한다. 정확히는 onComplete() 가 호출되고 난 뒤 실행되는 구문이다. 실행 결과는 다음과 같다.

empty() : 저는 먼저 들어가보겠습니다.. 총총

즉, never()onComplete() 가 호출되지 않는다. 종료되지 않는 녀석이다.


interval() 메소드

일정 간격 (시간) 을 지정해주면, 해당 간격마다 오름차순으로 정수 데이터를 0부터 차례로 발행하는 Observable 을 생성한다. 유용하게 쓰일 것만 같다.

아래 코드를 살펴보자. '1초에 한 번씩' 이라는 간격을 지정해주었다.

fun main() {
    val disposable = Observable.interval(1, TimeUnit.SECONDS)
        .subscribe { println(it) }

    Thread.sleep(4000)

    disposable.dispose()
}

그러면 실제로 아래와 같이, 정수가 0부터 차례대로 출력되는 것을 확인해볼 수 있다.

0
1
2
3

그런데 한 가지 주의해야 할 포인트가 있다. 해당 Observable 은 영원히 데이터를 발행한다. 따라서, 더이상 해당 스트림의 데이터를 사용하지 않을 때에는 꼭 dispose() 를 해주어야 메모리 릭을 방지할 수 있다.


range() 메소드

이 녀석은 특정 범위의 정수 데이터를 차례로 발행하는 Observable 을 생성한다. interval() 과 얼핏 보기에 비슷하지만, 특정 범위의 정수를 발행하고 발행이 끝나면 데이터 스트림을 종료시킨다는 점에서 다르다.

fun main() {
    Observable.range(3, 5)  // start, count
        .subscribe(System.out::println)
}
3
4
5
6
7

repeat() 메소드

이 녀석은 이름에서 가늠할 수 있다시피, 특정 데이터 스트림을 반복하는 Observable 을 생성하는 녀석이다.

긴 말 필요 없이 코드를 보자. 좀 전에 살펴본 range() 를 사용해서 0, 1, 2 를 발행하는 데이터 스트림을 생성하고, repeat() 를 통해 해당 Observable반복해보자.

fun main() {
    val stream = Observable.range(0, 3)  // start, count

    stream.repeat(3)  // 3회 반복하겠다
        .subscribe(System.out::println)
}
0
1
2
0
1
2
0
1
2

이렇게 0, 1, 2 가 정상적으로 3번 반복되어 출력되는 것을 확인해볼 수 있다.


이번 포스팅에선, 다이나믹하게 비동기 처리를 할 수 있도록 적절한 상황에 사용할 수 있는 각기다른 다양한 Observable 생성 연산자들에 대하여 알아보았다. 다음 포스팅에선 Observable 을 상황에 맞게 적절히 변형시켜주는 몇몇 녀석들을 알아보자.

profile
어려울수록 기본에 미치고 열광하라

0개의 댓글