🔔 앞으로의 Reactive X 시리즈는 RxJava, RxKotlin 기준으로 작성됩니다
공식 문서를 참고하여 작성된 포스팅입니다.
지난 포스팅에서 Observable
데이터 스트림에서 발생할 수 있는 '배압 현상' 이라는 것에 대해 알아보았고, 이를 제어할 수 있는 데이터 스트림인 Flowable
에 대해서도 알아보았다. 그러나, Flowable
에서도 이 배압 현상을 제어하지 못하여 오류 (MissingBackpressureException) 가 발생할 수 있는 예외상황이 존재한다고 말하며 글을 마무리 지었었다.
이를테면, Flowable
에 interval()
을 같이 사용하는 경우에도 오류가 발생한다. 왜냐하면 interval()
메소드는 스케줄러와 상관없이 단지 시간에 의존하여 데이터를 발행하기 때문이다.
따라서 이번 포스팅에선 이러한 예외상황을 대처할 수 있는 다양한 전략에 대해 다뤄보고자 한다.
BackpressureStrategy.MISSING
배압 전략을 구현하지 않는 것
BackpressureStrategy.ERROR
데이터 소비 속도가 발행 속도를 못 따라가는 경우
MissingBackpressureException
를 발생시킴
BackpressureStrategy.BUFFER
데이터 소비 속도가 발행 속도를 못 따라가는 경우, 발행된 데이터들을 구독자가 모두 소비할 때까지 버퍼에 넣어둠 (버퍼는 제한 크기가 없는 큐 형태지만, 자칫하면
OutOfMemoryException
발생 가능)
BackpressureStrategy.DROP
데이터 소비 속도가 발행 속도를 못 따라가는 경우, 발행된 데이터를 모두 갖다 버림
BackpressureStrategy.LATEST
구독자가 데이터를 모두 소비할 준비가 될 때까지 최신 데이터만 유지하고 나머지는 모두 갖다 버림
create()
를 통해 Flowable
을 생성하는 경우, 위에 열거한 전략 중 한 가지를 mode
파라미터에 반드시 기입해줘야 한다. 아래와 같이 말이다. 이를 기입하지 않으면 오류가 발생한다!
fun main() {
Flowable.create({ emitter: FlowableEmitter<Int> ->
for (i in 0..9999) emitter.onNext(i)
emitter.onComplete()
}, BackpressureStrategy.DROP)
.observeOn(Schedulers.io())
.subscribe(System.out::println)
}
위와 같이 굳이 create()
를 사용해야 하는 건 아니고, 다른 방법으로 생성된 Flowable
에 배압 전략을 적용할 수 있는 연산자도 3가지 제공한다.
onBackPressureBuffer()
BUFFER 전략을 적용해주는 녀석이다. 파라미터로 버퍼의 용량, 버퍼가 오버플로우 됐을 때의 동작, 버퍼 오버플로우 대응 전략 등을 전달해줄 수 있다.
아래 예제의 경우에는 128개 데이터를 수용할 수 있는 버퍼를 생성하고, 버퍼 오버플로우가 발생하게 된다면 버퍼의 가장 오래된 데이터를 버리도록 하는 전략을 지정해주었다.
fun main() {
Flowable.range(1, 1000)
.onBackpressureBuffer(128, {
// 여기에 오버플로우가 발생했을 때의 동작 정의 가능
}, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.io())
.subscribe({
Thread.sleep(100)
println("Emit : $it")
}, { it: Throwable ->
// 에러 핸들링
})
Thread.sleep((100 * 10).toLong())
}
Emit : 1
Emit : 2
Emit : 3
Emit : 4
Emit : 5
Emit : 6
Emit : 7
Emit : 8
Emit : 9
실행결과는 위와 같다. 만약 Thread.sleep((100 * 1000).toLong())
만큼 실행을 기다렸다면 1000까지 데이터를 모두 발행했을 것이다. 버퍼 오버플로우 시 DROP_OLDEST
전략을 적용했기 때문에, 데이터가 계속 최신 데이터로 갈아끼워지기 때문이다.
적용 가능한 버퍼 오버플로우 전략은 아래와 같은 것들이 있다.
ERROR
:MissingBackpressureError()
에러 던지고 데이터 스트림 중단DROP_LATEST
: 버퍼의 가장 최신 데이터 삭제하고 새로운 데이터 삽입DROP_OLDEST
: 버퍼의 가장 오래된 데이터 제거하고 새로운 데이터 삽입
onBackpressureDrop()
해당 Flowable
에 BackpressureStrategy.DROP
전략을 적용하는 것이다. 버퍼가 가득찼을 때, 이후 데이터를 그냥 무시해버린다.
fun main() {
Flowable.range(1, 50_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.subscribe({
Thread.sleep(100)
println("Emit : $it")
}, { it: Throwable ->
// 에러 핸들링
})
Thread.sleep(20_000)
}
Emit : 1
Emit : 2
Emit : 3
Emit : 4
...
Emit : 124
Emit : 125
Emit : 126
Emit : 127
Emit : 128
예제 코드를 실행해보면 128 까지 데이터를 소비하고 프로그램이 종료되는 것을 확인할 수 있다. onBackpressureDrop()
메소드를 사용하게 되면, 버퍼에 총 128개의 데이터 (버퍼 용량 기본값) 가 가득찼을 때 부터 나머지 데이터는 무시하게 된다.
onBackPressureLatest()
BUFFER 전략과 DROP 전략을 섞은 기능을 수행한다. 해당 Flowable
에 BackpressureStrategy.LATEST
전략을 적용하는 것이다.
fun main() {
Flowable.range(1, 1000)
.onBackpressureLatest()
.doOnNext { println("Emit : $it") }
.observeOn(Schedulers.io())
.subscribe {
Thread.sleep(100)
println("Consumer : $it")
}
Thread.sleep((100 * 1000).toLong())
}
Emit : 1
Emit : 2
Emit : 3
Emit : 4
Emit : 5
Emit : 6
Consumer : 1
Emit : 7
Emit : 8
...
Emit : 127
Emit : 128
Consumer : 2
Consumer : 3
Consumer : 4
Consumer : 5
Consumer : 6
Consumer : 7
...
Consumer : 95
Consumer : 96
Emit : 1000
Consumer : 97
Consumer : 98
...
Consumer : 126
Consumer : 127
Consumer : 128
Consumer : 1000
예제 코드 실행결과를 보면, 128까지 출력 잘 하다가 갑자기 1000을 출력하는 것을 볼 수 있다. 이는 구독자의 데이터 소비 속도가 너무 느려 128개의 데이터에 대해 버퍼링을 진행했고, 구독자가 이 128개의 데이터를 모두 소비하면 가장 최신 데이터인 '1000'을 출력하는 동작을 수행했기 때문이다.
이번 포스팅에선 Flowable
에 다양한 배압 전략을 적용하는 방법에 대해 알아보았다. 상황에 맞게 적절한 배압 전략을 선택함으로써, 발생할 수 있는 여러 오류 상황에 대응할 수 있다.