이전의 글에서 create()와 just()을 이용하여Observable을 생성하는 방법에 대해서 알아보았습니다.
하지만 RxJava에는 그것보다 더 많은 생성방법이 존재했습니다.
그렇기에 조금 더 알아보는 글을 작성해보려고 합니다.
defer()로 생성된 Observable은 create()나 just()와 다르게 observer가 구독하기전까지 Observable를 생성하지 않습니다. 즉 어떤 observer가 subscribe()를 호출할 때 까지 발행하기 시작한다는 것입니다.
코드를 통해 defer()를 알아보겠습니다.
fun getCurrentTime() : String {
val timeFormat = SimpleDateFormat("mm:ss.SSS", Locale.KOREA)
return timeFormat.format(System.currentTimeMillis())
}
fun main() {
val justStream = Observable.just(getCurrentTime())
val deferStream = Observable.defer {
Observable.just(getCurrentTime())
}
println("[1] : ${getCurrentTime()}")
Thread.sleep(5000)
println("[2] : ${getCurrentTime()}")
println("=========================")
justStream.subscribe {
println("just : $it")
}
deferStream.subscribe {
println("defer : $it")
}
}
다음 코드의 결과는 다음과 같습니다.
[1] : 15:44.640
[2] : 15:49.645
=========================================
just : 15:44.572
defer : 15:49.655
just()의 경우에는 Observable이 생성되을 때의 시간을 발행했고 defer()를 통해 생성된 Observer의 경우 구독을 시작했을 때의 시간을 발행했습니다. 이렇게 defer()는 위에서 설명한 것처럼 누군가 subscribe를 하기 전까지 기다리는 특성을 가지고 있습니다.
일정 간격 (시간)을 지정해주면, 해당 간격마다 오름차순으로 정수 데이터를 0부터 차례대로 발행하는 Observable을 생성합니다.
코드에서 1초에 한 번씩이라는 간격을 설정해주겠습니다.
fun main() {
val disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe { println(it) }
Thread.sleep(4000)
disposable.dispose()
}
결과는 다음과 같습니다.
0
1
2
3
정수가 0부터 출력되는 것을 알 수 있습니다.
interval을 사용할 때 주의해야할 부분이 있습니다. 해당 Observable은 영원히 데이터를 발행합니다. 즉 해당 스트림의 데이터를 더 이상 사용하지않을 때는 dispose()를 해주어야지 Memory leak을 방지할 수 있습니다.
timer()는 interval 함수와 유사하지만 한번만 실행하는 함수입니다. 일정 시간 지난 후 한번만 발행하고 onComplete 이벤트를 발생시킵니다.
바로 코드로 보겠습니다.
fun main() {
val stream = Observable.timer(3, TimeUnit.SECONDS)
println("옵저빙(구독) 시작!")
stream.subscribe {
println("[데이터 발행] item : $it, Class: ${it.javaClass}")
}
Thread.sleep(3000)
}
옵저빙(구독) 시작!
[데이터 발행] item : 0, Class: class java.lang.Long
empty()의 경우 항목은 없지만 정상적으로 종료되는 Observable을 생성해줍니다.
never()의 경우 항목을 방출하지 않고 종료하지 않는 Observable을 생성해줍니다.
fun main() {
Observable.empty<Any>()
.doOnTerminate { println("empty() : 비어있지만 동작은 합니다.") }
.subscribe()
Observable.never<Any>()
.doOnTerminate { println("never() : 아무것도 안나옵니다.") }
.subscribe()
}
doOnTerminate()는 Observable이 종료될 때 실행할 동작을 정의합니다. 정확히는 onComplete()가 호출되고 난 뒤 실행되는 구문입니다.
결과는 다음과 같습니다.
empty() : 비어있지만 동작은 합니다.
range()는 주어진 값(n)부터 m개의 integer 객체를 발행합니다. (interval과 timer 함수는 Long 객체를 발행합니다.)
fun main(){
val source = Observable.range(1,10).filter { num -> num%2==0 }
source.subscribe{it -> System.out.println(it)}
}
결과는 다음과 같습니다.
2
4
6
8
10
repeat()의 경우 특정 데이터 스트림을 반복하는 Observable을 생성하는 메서드입니다.
fun main() {
val stream = Observable.range(0, 3)
stream.repeat(3)
.subscribe(System.out::println)
}
위의 코드를 수행하면 결과는 다음과 같습니다.
0
1
2
0
1
2
0
1
2
0,1,2의 출력이 3번 반복되는 것을 확인할 수 있습니다.
이렇게 이번 글에서는 create()와 just()뿐만 아니라 각기 다른 다양한 Observable을 생성하는 법에 대해 알아봤습니다. 읽어주셔서 감사합니다.
http://reactivex.io/documentation/operators
https://velog.io/@haero_kim/RxJava-Observable-생성하기-고급편
https://jeongupark-study-house.tistory.com/82
https://jeongupark-study-house.tistory.com/94?category=820719