post-custom-banner

🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.

Backpressure 전략이 필요한 이유

지난 포스팅에서 Observable 데이터 스트림에서 발생할 수 있는 '배압 현상' 이라는 것에 대해 알아보았고, 이를 제어할 수 있는 데이터 스트림인 Flowable 에 대해서도 알아보았다. 그러나, Flowable 에서도 이 배압 현상을 제어하지 못하여 오류 (MissingBackpressureException) 가 발생할 수 있는 예외상황이 존재한다고 말하며 글을 마무리 지었었다.

이를테면, Flowableinterval() 을 같이 사용하는 경우에도 오류가 발생한다. 왜냐하면 interval() 메소드는 스케줄러와 상관없이 단지 시간에 의존하여 데이터를 발행하기 때문이다.

따라서 이번 포스팅에선 이러한 예외상황을 대처할 수 있는 다양한 전략에 대해 다뤄보고자 한다.

배압 전략의 종류

BackpressureStrategy.MISSING

배압 전략을 구현하지 않는 것

BackpressureStrategy.ERROR

데이터 소비 속도가 발행 속도를 못 따라가는 경우 MissingBackpressureException 를 발생시킴

BackpressureStrategy.BUFFER

데이터 소비 속도가 발행 속도를 못 따라가는 경우, 발행된 데이터들을 구독자가 모두 소비할 때까지 버퍼에 넣어둠 (버퍼는 제한 크기가 없는 큐 형태지만, 자칫하면 OutOfMemoryException 발생 가능)

BackpressureStrategy.DROP

데이터 소비 속도가 발행 속도를 못 따라가는 경우, 발행된 데이터를 모두 갖다 버림

BackpressureStrategy.LATEST

구독자가 데이터를 모두 소비할 준비가 될 때까지 최신 데이터만 유지하고 나머지는 모두 갖다 버림

사용 방법

create() 를 통해 Flowable 을 생성하는 경우, 위에 열거한 전략 중 한 가지를 mode 파라미터에 반드시 기입해줘야 한다. 아래와 같이 말이다. 이를 기입하지 않으면 오류가 발생한다!

fun main() {
    Flowable.create({ emitter: FlowableEmitter<Int> ->
        for (i in 0..9999) emitter.onNext(i)
        emitter.onComplete()
    }, BackpressureStrategy.DROP)
        .observeOn(Schedulers.io())
        .subscribe(System.out::println)
}

배압 제어 연산자

위와 같이 굳이 create() 를 사용해야 하는 건 아니고, 다른 방법으로 생성된 Flowable 에 배압 전략을 적용할 수 있는 연산자도 3가지 제공한다.

onBackPressureBuffer()

BUFFER 전략을 적용해주는 녀석이다. 파라미터로 버퍼의 용량, 버퍼가 오버플로우 됐을 때의 동작, 버퍼 오버플로우 대응 전략 등을 전달해줄 수 있다.

아래 예제의 경우에는 128개 데이터를 수용할 수 있는 버퍼를 생성하고, 버퍼 오버플로우가 발생하게 된다면 버퍼의 가장 오래된 데이터를 버리도록 하는 전략을 지정해주었다.

fun main() {
    Flowable.range(1, 1000)
        .onBackpressureBuffer(128, {
            // 여기에 오버플로우가 발생했을 때의 동작 정의 가능
        }, BackpressureOverflowStrategy.DROP_OLDEST)
        .observeOn(Schedulers.io())
        .subscribe({
            Thread.sleep(100)
            println("Emit : $it")
        }, { it: Throwable ->
            // 에러 핸들링
        })
    Thread.sleep((100 * 10).toLong())
}
Emit : 1
Emit : 2
Emit : 3
Emit : 4
Emit : 5
Emit : 6
Emit : 7
Emit : 8
Emit : 9

실행결과는 위와 같다. 만약 Thread.sleep((100 * 1000).toLong()) 만큼 실행을 기다렸다면 1000까지 데이터를 모두 발행했을 것이다. 버퍼 오버플로우 시 DROP_OLDEST 전략을 적용했기 때문에, 데이터가 계속 최신 데이터로 갈아끼워지기 때문이다.

적용 가능한 버퍼 오버플로우 전략은 아래와 같은 것들이 있다.

  • ERROR : MissingBackpressureError() 에러 던지고 데이터 스트림 중단
  • DROP_LATEST : 버퍼의 가장 최신 데이터 삭제하고 새로운 데이터 삽입
  • DROP_OLDEST : 버퍼의 가장 오래된 데이터 제거하고 새로운 데이터 삽입

onBackpressureDrop()

해당 FlowableBackpressureStrategy.DROP 전략을 적용하는 것이다. 버퍼가 가득찼을 때, 이후 데이터를 그냥 무시해버린다.


fun main() {
    Flowable.range(1, 50_000_000)
        .onBackpressureDrop()
        .observeOn(Schedulers.io())
        .subscribe({
            Thread.sleep(100)
            println("Emit : $it")
        }, { it: Throwable ->
            // 에러 핸들링
        })

    Thread.sleep(20_000)
}
Emit : 1
Emit : 2
Emit : 3
Emit : 4
...
Emit : 124
Emit : 125
Emit : 126
Emit : 127
Emit : 128

예제 코드를 실행해보면 128 까지 데이터를 소비하고 프로그램이 종료되는 것을 확인할 수 있다. onBackpressureDrop() 메소드를 사용하게 되면, 버퍼에 총 128개의 데이터 (버퍼 용량 기본값) 가 가득찼을 때 부터 나머지 데이터는 무시하게 된다.

onBackPressureLatest()

BUFFER 전략과 DROP 전략을 섞은 기능을 수행한다. 해당 FlowableBackpressureStrategy.LATEST 전략을 적용하는 것이다.

fun main() {
    Flowable.range(1, 1000)
        .onBackpressureLatest()
        .doOnNext { println("Emit : $it") }
        .observeOn(Schedulers.io())
        .subscribe {
						Thread.sleep(100)
            println("Consumer : $it")
        }
    Thread.sleep((100 * 1000).toLong())
}
Emit : 1
Emit : 2
Emit : 3
Emit : 4
Emit : 5
Emit : 6
Consumer : 1
Emit : 7
Emit : 8
...
Emit : 127
Emit : 128
Consumer : 2
Consumer : 3
Consumer : 4
Consumer : 5
Consumer : 6
Consumer : 7
...
Consumer : 95
Consumer : 96
Emit : 1000
Consumer : 97
Consumer : 98
...
Consumer : 126
Consumer : 127
Consumer : 128
Consumer : 1000

예제 코드 실행결과를 보면, 128까지 출력 잘 하다가 갑자기 1000을 출력하는 것을 볼 수 있다. 이는 구독자의 데이터 소비 속도가 너무 느려 128개의 데이터에 대해 버퍼링을 진행했고, 구독자가 이 128개의 데이터를 모두 소비하면 가장 최신 데이터인 '1000'을 출력하는 동작을 수행했기 때문이다.


이번 포스팅에선 Flowable 에 다양한 배압 전략을 적용하는 방법에 대해 알아보았다. 상황에 맞게 적절한 배압 전략을 선택함으로써, 발생할 수 있는 여러 오류 상황에 대응할 수 있다.

Flowable 복습하기

profile
어려울수록 기본에 미치고 열광하라
post-custom-banner

0개의 댓글