🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.
이전 포스팅과 이어집니다.
이전 포스팅에서 create()
나 just()
와 같이 Observable
을 생성하는 다양한 방법들에 대해서 알아보았는데, 사실 빙산의 일각이었다. 아래 사진처럼 매우 다양하다.
종류가 너무 많다고 암담해하지 말자. 연산자가 다양한 만큼, 더욱 다이나믹하게, 상황에 따라 적절한 비동기 처리를 할 수 있다는 뜻이기도 하니까. 생성 연산자를 다양하게 알면 알 수록 더욱 Reactive 한 코드를 짤 수 있을 것이다!
따라서 오늘은 Observable
을 생성하는 방법 몇 가지를 더 알아보자.
'Defer' 의 사전적 정의는 '미루다, 연기하다' 이다. 그와 비슷하게, defer()
연산자는 누군가가 구독할 때까지 Observable 데이터 발행을 미루게 한다. 즉, 어떤 Observer 가 subscribe()
를 호출할 때 데이터를 발행하기 시작하는 것이다.
아래 예제 코드를 통해 defer()
의 동작을 이해해보자.
fun getCurrentTime(): String {
val timeFormat = SimpleDateFormat("mm:ss.SSS", Locale.KOREA)
return timeFormat.format(System.currentTimeMillis())
}
fun main() {
val justStream = Observable.just(getCurrentTime())
val deferStream = Observable.defer {
Observable.just(getCurrentTime())
}
// 5초 경과
println("[1] : ${getCurrentTime()}")
Thread.sleep(5000)
println("[2] : ${getCurrentTime()}")
println("=========================================")
justStream.subscribe {
println("just : $it")
}
deferStream.subscribe {
println("defer : $it")
}
}
데이터 발행 방식의 차이를 명확하게 이해하기 위해 현재 시각을 출력하는 동작과 함께 just()
와 비교해보도록 하자. 쓰레드를 5초동안 정지했고, just()
을 통해 생성된 Observable 이 발행하는 데이터와 defer()
를 통해 생성된 Observable 이 발행하는 데이터를 자세히 보자.
[1] : 15:44.640
[2] : 15:49.645
=========================================
just : 15:44.572
defer : 15:49.655
just()
를 통해 생성된 녀석은 Observable 이 생성됐을 때의 시간을 발행했고, defer()
를 통해 생성된 녀석은 구독을 시작했을 때의 시간을 발행했다. 차이가 이해되는가?
따라서 구독 당시 가장 최신 데이터를 받아보고 싶을 때 defer()
를 사용하면 된다.
지정된 시간만큼 기다렸다가 0L
이라는 데이터를 발행하는 녀석이다. 간단히 이해할 수 있다.
fun main() {
val stream = Observable.timer(3, TimeUnit.SECONDS)
println("옵저빙(구독) 시작!")
stream.subscribe {
println("[데이터 발행] item : $it, Class: ${it.javaClass}")
}
Thread.sleep(3000)
}
옵저빙(구독) 시작!
[데이터 발행] item : 0, Class: class java.lang.Long
아무 데이터도 발행하지 않는 (???) 데이터 스트림을 생성한다. 굳이 왜 있는지 잘 모르겠을 수 있으나, 테스트 목적으로 사용하기 좋을 것이다. (아래 다이어그램조차 존재 의의를 궁금하게 할 정도로 기괴하다)
이 두 연산자의 차이점은 명확하다. 아래 코드를 살펴보자. 물론 아무런 데이터도 발행하지 않는다.
fun main() {
Observable.empty<Any>()
.doOnTerminate { println("empty() : 저는 먼저 들어가보겠습니다.. 총총") }
.subscribe()
Observable.never<Any>()
.doOnTerminate { println("never() : 하 나는 또 야근이네") }
.subscribe()
}
doOnTerminate()
는 Observable 이 종료될 때 실행할 동작을 정의한다. 정확히는 onComplete()
가 호출되고 난 뒤 실행되는 구문이다. 실행 결과는 다음과 같다.
empty() : 저는 먼저 들어가보겠습니다.. 총총
즉, never()
은 onComplete()
가 호출되지 않는다. 종료되지 않는 녀석이다.
일정 간격 (시간) 을 지정해주면, 해당 간격마다 오름차순으로 정수 데이터를 0부터 차례로 발행하는 Observable 을 생성한다. 유용하게 쓰일 것만 같다.
아래 코드를 살펴보자. '1초에 한 번씩' 이라는 간격을 지정해주었다.
fun main() {
val disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe { println(it) }
Thread.sleep(4000)
disposable.dispose()
}
그러면 실제로 아래와 같이, 정수가 0부터 차례대로 출력되는 것을 확인해볼 수 있다.
0
1
2
3
그런데 한 가지 주의해야 할 포인트가 있다. 해당 Observable 은 영원히 데이터를 발행한다. 따라서, 더이상 해당 스트림의 데이터를 사용하지 않을 때에는 꼭 dispose()
를 해주어야 메모리 릭을 방지할 수 있다.
이 녀석은 특정 범위의 정수 데이터를 차례로 발행하는 Observable 을 생성한다. interval()
과 얼핏 보기에 비슷하지만, 특정 범위의 정수를 발행하고 발행이 끝나면 데이터 스트림을 종료시킨다는 점에서 다르다.
fun main() {
Observable.range(3, 5) // start, count
.subscribe(System.out::println)
}
3
4
5
6
7
이 녀석은 이름에서 가늠할 수 있다시피, 특정 데이터 스트림을 반복하는 Observable 을 생성하는 녀석이다.
긴 말 필요 없이 코드를 보자. 좀 전에 살펴본 range()
를 사용해서 0, 1, 2
를 발행하는 데이터 스트림을 생성하고, repeat()
를 통해 해당 Observable 을 반복해보자.
fun main() {
val stream = Observable.range(0, 3) // start, count
stream.repeat(3) // 3회 반복하겠다
.subscribe(System.out::println)
}
0
1
2
0
1
2
0
1
2
이렇게 0, 1, 2
가 정상적으로 3번 반복되어 출력되는 것을 확인해볼 수 있다.
이번 포스팅에선, 다이나믹하게 비동기 처리를 할 수 있도록 적절한 상황에 사용할 수 있는 각기다른 다양한 Observable 생성 연산자들에 대하여 알아보았다. 다음 포스팅에선 Observable 을 상황에 맞게 적절히 변형시켜주는 몇몇 녀석들을 알아보자.