RxJava 입문하기 (Kotlin) - 기본2

CmplxN·2020년 7월 31일
1

지난 시간에 이어 RxJava의 연산자에 대해 알아보자.
kotlin의 collection 함수들을 써봤다면 익숙한 함수들을 많이 만날 수 있다.

생성 연산자

create

  • 함수 내부에서 emitter가 직접 onNext, onComplete등으로 데이터를 전달하는 연산자
Observable.create<String> { emitter ->
        emitter.onNext("Hello")
        emitter.onNext("RxJava")
        emitter.onComplete()
    }.subscribe { println(it) }

defer

  • ObservableSource를 리턴하는 Callable을 받는 연산자.
Observable.defer {
    Observable.create<String> { emitter ->
        emitter.onComplete()
    }
 }.subscribe(::println)

from

  • Array, Iterable, Callable로부터 Observable을 만드는 연산자
val items = arrayOf("Hello", "World")
    Observable.fromArray(*items).subscribe(::println)

interval

  • 주어진 주기대로 0부터 1씩 증가된 값을 만드는 연산자
Observable.interval(100, TimeUnit.MILLISECONDS)
        .subscribe(::println)
Thread.sleep(555)

just

  • 최대 10개의 데이터를 전달하는 연산자
Observable.just(1, 2, 3).subscribe(::println)

range

  • range(start, cnt) : start부터 cnt만큼 1씩 증가한 데이터를 전달하는 연산자
Observable.range(3, 2).subscribe(::println)

repeat

  • Observable을 지정한 횟수만큼 반복시키는 연산자 (subscribe 포함)
val observable = Observable.just("Hello", "World")
    .repeat(2)
observable.subscribe(::println)

timer

  • 정해진 시간 후 0을 전달하는 Observable을 반환
println("start : ${System.currentTimeMillis()}")
Observable.timer(1000, TimeUnit.MILLISECONDS)
    .subscribe {
        println("start : ${System.currentTimeMillis()} + $it")
    }

변환 연산자

buffer

  • buffer(count: Int, skip: Int) 형태로 count만큼 데이터가 모이면 한번에 전달한다.
  • count까지 포함해서 skip만큼의 data는 버린다. (count < skip이면 차이만큼 데이터를 무시)
  • 마지막 남는 데이터는 count만큼 차지 않아도 전달한다.
bservable.fromIterable(0..8)
    .buffer(2,4)
    .subscribe(::println)

map

  • 데이터를 변환하는 연산자
Observable.fromIterable(0..3)
    .map { "RxJava : $it" }
    .subscribe(::println)

xxxMap

  • 공통 : Observable을 받아 새로운 Observable을 만드는 연산자
  • flatMap : 데이터를 병렬적으로 처리
  • concatMap : 데이터를 직렬적으로 처리
  • switchMap : 중간에 데이터가 들어오면 무시
  • 아래 예제를 실행해보면 어떤 느낌인지 알 수 있을 것이다.
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6))
    .xxxMap { original: Int ->
        Observable.just("$original plusplus")
            .delay(Random.nextLong(5), TimeUnit.SECONDS)
    }
    .subscribe(::println)

flatMap : 랜덤한 순서 출력
concatMap : 순서대로 출력
switchMap : 6 출력

scan

  • 이전 데이터와 현재 데이터를 조합하여 데이터를 전달하는 연산자.
  • 첫 데이터는 그대로 전달한다. reduce로 이해하면 된다.
Observable.fromIterable(0..3)
    .scan { t1, t2 -> t1 + t2 }
    .subscribe(::println)

필터링 연산자

debounce

  • 일정시간 다른 아이템이 생성되지 않으면 데이터를 전달하는 연산자
Observable.interval(250,TimeUnit.MILLISECONDS)
    .debounce(333, TimeUnit.MILLISECONDS)
    .subscribe(::println) // 출력 없음
Thread.sleep(1000)

distinct / distinctUntilChange

  • distinct : 중복되지 않는 데이터만 전달
  • distinctUntilChange : 연속하여 같은 값은 전달하지 않음
data class T(val a: Int, val b: Int)
Observable.just(T(2, 3), T(2, 4), T(1, 5), T(7, -1), T(2, 2))
    .distinct { it.a + it.b } // key selector
    .subscribe(::println)

filter / ofType

  • filter : 특정 조건에 맞는 데이터만 전달
  • ofType : 특정 타입에 맞는 데이터만 전달. 전달시 typecasting이 되어있음
Observable.just(11,true,"Hello","Rx",false)
    .ofType(String::class.java)
    .filter { it.length == 2 }
    .subscribe(::println)

ignoreElements

  • 모든 데이터를 무시하고 Completable을 리턴 (데이터 받긴 하지만 성공여부만 알고싶음)
Observable.just(6,4,2)
    .ignoreElements()
    .subscribe(::println)

throttleFirst / throttleLast

  • 특정 시간동안 들어오는 데이터 중 첫번째 / 마지막 데이터만 전달
Observable.interval(0, 100, TimeUnit.MILLISECONDS)
    .throttleLast(250, TimeUnit.MILLISECONDS)
    .subscribe {
        println(System.currentTimeMillis())
        println(it)
    }
Thread.sleep(1000)

skip / take

  • skip : 앞부터 n개의 데이터를 생략하고 전달
  • take : 앞부터 n개의 데이터만 전달
  • last, while, until 등 응용할 수 있다.
Observable.just(1,2,3,4,5,6,7,8,9,10)
    .take(9)
    .skip(2)
    .skipLast(2)
    .takeLast(3)
    .subscribe(::println)

결합 연산자

combineLatest

  • 각각 Observable에 데이터 생성될 때 데이터를 조합해서 전달하는 연산자
val observable1 = Observable.interval(1000L, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(750L, TimeUnit.MILLISECONDS).map { it + 1000 }
Observable.combineLatest<Long, Long, String>(
    observable1, observable2,
    BiFunction { t1, t2 -> // RxKotlin은 BiFunction없이 람다를 쓸 수 있다.
        "$t1 $t2"
    }
).subscribe {
    println(System.currentTimeMillis())
    println(it)
}
Thread.sleep(3000)

merge

  • 각각 Observable을 단순히 합치는 연산자
  • mergeWith으로 이어 붙일 수도 있다.
val observable1 = Observable.interval(0, 1000, TimeUnit.MILLISECONDS).map { "1:$it" }
val observable2 = Observable.interval(0, 500, TimeUnit.MILLISECONDS).map { "2:$it" }
val observable = Observable.merge(observable1, observable2)
observable.subscribe {
    println(it)
}

zip

  • Observable에서 생성한 데이터 순서에 맞게 조합하는 연산자
val observable1 = Observable.just(1, 2, 3, 4, 5, 6, 7)
val observable2 = Observable.just("a", "b", "c", "d", "e", "f")
val observable = Observable.zip(
    observable1, observable2,
    BiFunction<Int, String, String> { t1, t2 -> // RxKotlin시 람다로 가능
        "$t1 and $t2"
    })
observable.subscribe(::println)

startWith

  • Observable에 첫번째 데이터를 추가한다.
Observable.just(1,2,3,4,5)
    .startWith(500)
    .subscribe(::println)

기타 유용한 것

delay

  • 정해진 시간만큼 데이터를 늦게 전달하는 연산자
println(System.currentTimeMillis())
Observable.just("Hello", "World")
    .delay(1000, TimeUnit.MILLISECONDS)
    .subscribe {
        println(System.currentTimeMillis())
    }
Thread.sleep(1111)

do

  • do : doOnXX와 doAfterXX로 되어있으며 XX 전/후의 동작을 정한다.
  • 디버깅하거나 stream중에 intercept할 때 쓰면 된다.
Observable.just("Hello", "RxJava")
    .doOnNext { println("doOnNext") }
    .doOnSubscribe { println("doOnSubscribe") }
    .doAfterNext { println("doAfterNext") }
    .doAfterTerminate { println("doAfterTerminate") }
    .doOnEach { println("doOnEach") } // 모든 이벤트에 걸림 complete까지
    .doFinally { println("Finally") }
    .doOnComplete { println("doOnComplete") }
    .doOnDispose { println("doOnDispose") }
    .doOnError { println("doOnError") }
    .doOnTerminate { println("doOnTerminate") }
    .subscribe { println("- $it") }

observeOn / subscribeOn

  • subscribeOn : 구독할 스케쥴러를 결정
  • observeOn : onNext, onError 등의 동작을 실행할 스케쥴러 설정. 체이닝하여 바꿀 수 있다.
Observable.just(1,2)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread()) // Android용
    .subscribe(::println)

timeout

  • 일정시간 데이터를 전달하지 못하면 Exception을 onError로 전달하게 하는 연산자
Observable.just("Hello", "World")
    .delay(3000L, TimeUnit.MILLISECONDS)
    .timeout(2000L, TimeUnit.MILLISECONDS)
    .subscribe((::println), (::println))
Thread.sleep(4000)

retry

  • onError 발생시 retry할 수 있는 연산자. retryWhen, retryUntil 등으로 상세한 조건을 줄 수 있다.
Observable.just(1, 2, 3, 4, 5)
    .map { if (it > 4) throw IOException() else it}
    .retry { cnt, t2 ->
        cnt <= 1
    }
    .subscribe((::println), (::println))

andThen

  • Completable들을 체이닝하여 하나로 만들어주는 연산자
Completable.complete()
    .andThen(Completable.complete())
    .subscribe { println("Everything Success") }

마무리

글 두개로 RxJava의 기본 of 기본을 알아봤다. 공식문서, 홈페이지에서 마블 다이어그램과 함께 더 좋은 설명과 다양한 연산자들을 배울 수 있다.
필자도 이제 입문하는 입장이므로, 새로운 사용 예시를 알게될 때 마다 이곳에 정리해볼 생각이다.

profile
Android Developer

1개의 댓글

comment-user-thumbnail
2021년 8월 25일

잘 정리해주셔서 감사합니다 많은 도움이 됐습니다

답글 달기