🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.
이전 포스팅과 이어집니다.
필터링 연산자는 Observable 이 뱉는 데이터들을 어떤 기준에 의해 거른 후 발행하도록 한다. 이번 포스팅에선 Observable 필터링 연산자 중 자주 사용되는 몇 가지를 알아보며, 개념을 익혀보도록 하자.
이 녀석은 특정 시간동안 더 이상 데이터가 발행되지 않을 때에 데이터을 발행하는 연산자이다.
예를 들어 검색창 입력필드에 사용자가 검색어를 입력할 때 검색버튼 누르지 않아도 검색이 되는 기능을 구현한다고 치자. 이 때 검색어가 변화하는 대로 계속하여 서버에 쿼리 (혹은 DB 쿼리) 를 보내게 된다면, 리소스 낭비이다. 왜냐하면 '사과' 라는 키워드를 완성하기 위해 사용자는 'ㅅ → 사 → 사ㄱ → 사고 → 사과' 의 과정을 거치기 때문에 총 5번의 쿼리를 날려버리게 된다.
따라서 사용자가 검색어를 다 입력함을 감지했을 때 쿼리를 날리면 효율적인 동작을 할 수 있다. 이럴 때 debounce()
를 사용하는 것이다. 아래 코드를 통해 쉽게 이해할 수 있다.
fun main() {
Observable.create { emitter: ObservableEmitter<Any?> ->
emitter.onNext("1")
Thread.sleep(100)
emitter.onNext("2")
emitter.onNext("3")
emitter.onNext("4")
Thread.sleep(100)
emitter.onNext("5")
emitter.onNext("6")
emitter.onNext("7")
emitter.onNext("8")
emitter.onNext("9")
emitter.onNext("10")
Thread.sleep(100)
}
.debounce(10, TimeUnit.MILLISECONDS)
.subscribe {
println(it)
}
}
1
4
10
debounce()
의 파라미터로 기준 시간을 입력해주고 동작을 살펴보면, Thread.sleep(100)
을 통해 아무 데이터가 발행되지 않을 때 마지막으로 발행된 녀석들만 출력된 것을 볼 수 있다.
간단히 말해서, 중복된 데이터를 발행하지 않는 필터링 연산자이다.
fun main() {
Observable.just("A", "B", "B", "B", "B", "A", "C")
.distinct()
.subscribe(System.out::println)
}
A
B
C
발행되는 데이터 스트림에서 특정 인덱스에 해당하는 데이터만 발행해주는 녀석이다.
fun main() {
Observable.just("A", "B", "B", "A", "C", "D")
.elementAt(4)
.subscribe(System.out::println)
}
C
4번째 인덱스에 위치한 'C' 가 발행된 모습이다.
발행되는 데이터 스트림에서 특정 연산식의 조건에 부합 ( true
) 하는 녀석들만 발행해주는 연산자이다. Observable 필터링 연산자 중 가장 베이직(?) 하다고 할 수 있다.
fun main() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.filter { it % 2 == 0 }
.subscribe(System.out::println)
}
2
4
6
8
의도대로 짝수 데이터만 발행된 것을 확인할 수 있다.
이 메소드는 일정 시간 간격으로, 최근에 Observable이 발행한 아이템들을 발행한다.
fun main() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.sample(300, TimeUnit.MILLISECONDS)
.subscribe(System.out::println)
}
1
4
7
10
13
16
.
.
.
100ms
마다 데이터를 발행하는 interval(100, TimeUnit.MILLISECONDS)
연산자에, 300ms
마다 데이터를 샘플링하는 연산자 sample(300, TimeUnit.MILLISECONDS)
를 붙여 위와 같은 결과가 나오게 된다.
우리가 알고 있는 그 스킵의 개념이 맞다. 발행되는 데이터 스트림에서, 지정한 개수만큼 데이터 발행을 스킵하는 연산자이다. 마찬가지로 아래 코드를 통해 쉽게 이해할 수 있다.
fun main() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.skip(5)
.subscribe(System.out::println)
}
6
7
8
위에서 소개한 skip()
과 반대로, 지정한 개수만큼만 데이터를 발행한다.
fun main() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(5)
.subscribe(System.out::println)
}
1
2
3
4
5
위에서 소개한 take()
와 반대 개념이다. 뒤에서부터 셌을 때 몇 개의 데이터를 발행할 지 지정한다.
fun main() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takeLast(3)
.subscribe(System.out::println)
}
6
7
8
이번 포스팅에선, Observable 이 발행하는 데이터를 필터링하는 다양한 연산자에 대하여 알아보았다. 다음 포스팅에선 여러 개의 Observable 데이터 스트림을 결합하는 연산자에 대해 알아보려고 한다.