[RxJava] Observable 필터링하기

H43RO·2021년 9월 8일
5

Reactive Programming

목록 보기
9/16
post-thumbnail
post-custom-banner

🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.

이전 포스팅과 이어집니다.

Observable 필터링하기

필터링 연산자는 Observable 이 뱉는 데이터들을 어떤 기준에 의해 거른 후 발행하도록 한다. 이번 포스팅에선 Observable 필터링 연산자 중 자주 사용되는 몇 가지를 알아보며, 개념을 익혀보도록 하자.


debounce() 메소드

이 녀석은 특정 시간동안 더 이상 데이터가 발행되지 않을 때에 데이터을 발행하는 연산자이다.

예를 들어 검색창 입력필드에 사용자가 검색어를 입력할 때 검색버튼 누르지 않아도 검색이 되는 기능을 구현한다고 치자. 이 때 검색어가 변화하는 대로 계속하여 서버에 쿼리 (혹은 DB 쿼리) 를 보내게 된다면, 리소스 낭비이다. 왜냐하면 '사과' 라는 키워드를 완성하기 위해 사용자는 'ㅅ → 사 → 사ㄱ → 사고 → 사과' 의 과정을 거치기 때문에 총 5번의 쿼리를 날려버리게 된다.

따라서 사용자가 검색어를 다 입력함을 감지했을 때 쿼리를 날리면 효율적인 동작을 할 수 있다. 이럴 때 debounce() 를 사용하는 것이다. 아래 코드를 통해 쉽게 이해할 수 있다.

fun main() {
    Observable.create { emitter: ObservableEmitter<Any?> ->
        emitter.onNext("1")
        Thread.sleep(100)
        emitter.onNext("2")
        emitter.onNext("3")
        emitter.onNext("4")
        Thread.sleep(100)
        emitter.onNext("5")
        emitter.onNext("6")
        emitter.onNext("7")
        emitter.onNext("8")
        emitter.onNext("9")
        emitter.onNext("10")
        Thread.sleep(100)
    }
        .debounce(10, TimeUnit.MILLISECONDS)
        .subscribe {
            println(it)
        }
}
1
4
10

debounce() 의 파라미터로 기준 시간을 입력해주고 동작을 살펴보면, Thread.sleep(100) 을 통해 아무 데이터가 발행되지 않을 때 마지막으로 발행된 녀석들만 출력된 것을 볼 수 있다.


distinct() 메소드

간단히 말해서, 중복된 데이터를 발행하지 않는 필터링 연산자이다.

fun main() {
    Observable.just("A", "B", "B", "B", "B", "A", "C")
        .distinct()
        .subscribe(System.out::println)
}
A
B
C

elementAt() 메소드

발행되는 데이터 스트림에서 특정 인덱스에 해당하는 데이터만 발행해주는 녀석이다.

fun main() {
    Observable.just("A", "B", "B", "A", "C", "D")
        .elementAt(4)
        .subscribe(System.out::println)
}
C

4번째 인덱스에 위치한 'C' 가 발행된 모습이다.


filter() 메소드

발행되는 데이터 스트림에서 특정 연산식의 조건에 부합 ( true ) 하는 녀석들만 발행해주는 연산자이다. Observable 필터링 연산자 중 가장 베이직(?) 하다고 할 수 있다.

fun main() {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .filter { it % 2 == 0 }
        .subscribe(System.out::println)
}
2
4
6
8

의도대로 짝수 데이터만 발행된 것을 확인할 수 있다.


sample() 메소드

이 메소드는 일정 시간 간격으로, 최근에 Observable이 발행한 아이템들을 발행한다.

fun main() {
    Observable.interval(100, TimeUnit.MILLISECONDS)
        .sample(300, TimeUnit.MILLISECONDS)
        .subscribe(System.out::println)
}
1
4
7
10
13
16
.
.
.

100ms 마다 데이터를 발행하는 interval(100, TimeUnit.MILLISECONDS) 연산자에, 300ms 마다 데이터를 샘플링하는 연산자 sample(300, TimeUnit.MILLISECONDS) 를 붙여 위와 같은 결과가 나오게 된다.


skip() 메소드

우리가 알고 있는 그 스킵의 개념이 맞다. 발행되는 데이터 스트림에서, 지정한 개수만큼 데이터 발행을 스킵하는 연산자이다. 마찬가지로 아래 코드를 통해 쉽게 이해할 수 있다.

fun main() {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .skip(5)
        .subscribe(System.out::println)
}
6
7
8

take() 메소드

위에서 소개한 skip() 과 반대로, 지정한 개수만큼만 데이터를 발행한다.

fun main() {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .take(5)
        .subscribe(System.out::println)
}
1
2
3
4
5

takeLast() 메소드

위에서 소개한 take() 와 반대 개념이다. 뒤에서부터 셌을 때 몇 개의 데이터를 발행할 지 지정한다.

fun main() {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .takeLast(3)
        .subscribe(System.out::println)
}
6
7
8


이번 포스팅에선, Observable 이 발행하는 데이터를 필터링하는 다양한 연산자에 대하여 알아보았다. 다음 포스팅에선 여러 개의 Observable 데이터 스트림을 결합하는 연산자에 대해 알아보려고 한다.

profile
어려울수록 기본에 미치고 열광하라
post-custom-banner

0개의 댓글