[안드로이드] RxKotlin(RxJava) #7 - Backpressure(배압)와 Flowable

hee09·2022년 9월 23일
0

RxKotlin

목록 보기
7/7
post-custom-banner

Backpressure와 Flowable

Observable.range(1, Int.MAX_VALUE)
    .map { item ->
        println("아이템 발행 : $item")
        item
    }
    .subscribe { item ->
        Thread.sleep(100)
        println("아이템 소비 : $item")
    }

/*
아이템 발행 : 1
아이템 소비 : 1
아이템 발행 : 2
아이템 소비 : 2
아이템 발행 : 3
아이템 소비 : 3
...
아이템 발행 : 100
아이템 소비 : 100
아이템 발행 : 101
아이템 소비 : 101
...
*/

위의 예제는 동일한 스레드로 아이템을 발행하고 소비해서 균형적으로 생산과 소비가 이루어지고 있습니다. 모든 생산과 소비가 균형적으로 이루어지면 좋지만, 현실은 그렇지 않습니다. 즉, 생산과 소비에 불균형이 발생할 수 있습니다.

생산과 소비의 균형이 맞지 않으면 문제가 발생합니다. 이러한 개념을 Backpressure라고 하고, 이를 해결하기 위해서 Flowable이 존재합니다. 이번 글에서는 이 개념들이 무엇이고 어떻게 사용하는지 알아보도록 하겠습니다.


배압(Backpressure)

배압은 데이터 생산과 소비가 불균형일 때 발생하는 현상 입니다. 예를 들어보도록 하겠습니다. 일정한 배차 간격을 갖는 버스가 있고, 이 버스틑 타려고 사람들은 정류장에 줄을 섭니다. 이때, 버스가 오지 않거나 만차라면 버스를 타려는 사람들의 대기 줄은 점점 길어집니다. 이 대기줄이 점점 길어지면 정류장에 사람이 넘칠 것입니다.

이런 현상을 Observable에 대입하여 생각해봅니다. Observable에서 100개의 데이터를 0.5초마다 발행하고, Observer는 10초마다 소비하여, 발행된 데이터를 빠르게 소비하지 못한다면 소비되지 못한 아이템들은 메모리에 누적됩니다. 결국 메모리는 overflow가 되고 OutOfMemoryError 예외를 포함한 많은 문제가 발생할 것입니다.


Flowable

배압을 직접 제어할 수도 있지만, 이미 RxKotlin(RxJava)에서는 스트림이 쌓이는 아이템의 양을 제어할 수 있는 솔루션을 제공합니다. 배압이 발생하는 코드에서 Observable과 Flowable을 사용해보도록 하겠습니다.

Observable

Observable.range(1, Int.MAX_VALUE)
    .map { item ->
        println("아이템 발행 : $item")
        item
    }
    .observeOn(Schedulers.io())
    .subscribe { item ->
        Thread.sleep(100)
        println("아이템 소비 : $item")
    }

Thread.sleep(30 * 100)

/*
...
아이템 발행 : 175797
아이템 발행 : 175798
아이템 발행 : 175799
아이템 소비 : 6
아이템 발행 : 175800
아이템 발행 : 175801
아이템 발행 : 175802
...
 */

Flowable

Flowable.range(1, Int.MAX_VALUE)
    .map { item ->
        println("아이템 발행 : $item")
        item
    }
    .observeOn(Schedulers.io())
    .subscribe { item ->
        Thread.sleep(100)
        println("아이템 소비 : $item")
    }

Thread.sleep(30 * 1000)

/*
...
아이템 발행 : 124
아이템 발행 : 125
아이템 발행 : 126
아이템 발행 : 127
아이템 발행 : 128
아이템 소비 : 1
아이템 소비 : 2
아이템 소비 : 3
아이템 소비 : 4
아이템 소비 : 5
...
 */
  • Observable과 Flowable 모두 Int의 MAX_VALUE 만큼 데이터를 발행하면서, 소비는 100ms의 delay를 주었습니다.
  • Observable을 사용한 결과를 보시면 아시겠지만 데이터 발행과 소비가 균형적으로 일어나지 않으며, Observer의 소비량과 상관없이 아이템을 계속해서 발행합니다.
  • Flowable을 사용한 경우 발행된 데이터가 일정량 누적되면 더는 아이템을 발행하지 않는 것을 확인할 수 있습니다. 즉, Flowable은 배압을 스스로 조절합니다.

Which type to use??

배압을 조절하기에 Flowable만 사용해도 될 것 같지만, Observable을 사용하는 것이 Flowable보다 일반적으로 overhead가 적습니다. 이러한 이유로 ReactiveX/RxJava/wiki에는 언제 Observable을 사용하고 언제 Flowable을 사용할지에 대한 기준이 나와있습니다.


When to use Observable

  • 최대 1000개 이하의 데이터가 흐름이 발행되는 경우 사용.
    즉, 적은 데이터를 발행하여 OOME(OutOfMemoryException)이 발생활 확률이 적은 경우.

  • 마우스 이동 또는 터치 이벤트와 같은 GUI 이벤트를 처리하는 경우(초당 1000회 이하의 이벤트는 sample()이나 debounce() 연산자로 처리 가능)

  • 데이터 흐름이 본질적으로 동기(synchronous) 방식이지만, 플랫봄이 자바 Stream을 제공하지 않는 경우 사용


When to use Flowable

  • 10,000개 이상의 데이터 흐름이 발행되는 경우

  • 디스크에서 파일을 읽는 경우(blocking이 일어나고 내가 원하는 만큼 가져오는 방식(pull-based)인 것)

  • JDBC에서 데이터베이스를 읽는 경우(blocking이 일어나고 pull-based인 것)

  • 네트워크 IO인 경우

  • 많은 blocking이 일어나고 pull-based인 데이터 소스를 non-blocking 방식으로 제공할 경우


배압 전략

Flowable.interval(10, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .map { item ->
        Thread.sleep(2000)
        println("아이템 발행 : $item")
        item
    }
    .subscribeBy(
        onNext = { item ->
            println("아이템 소비 : $item")
        },
        onError = { throwable ->
            throwable.printStackTrace()
        }
    )

Thread.sleep(30 * 1000)

/*
아이템 발행 : 0
아이템 소비 : 0
io.reactivex.rxjava3.exceptions.MissingBackpressureException
 */

interval 연산자와 Flowable을 같이 사용하면 위와 같이 문제가 발생할 수 있습니다. interval과 같은 연산자들은 스케줄러의 설정과 관계없이 시간을 기반으로 충실히 아이템을 발행합니다. 그러므로 생산하는 쪽에서 블로킹 이슈가 발생 하면 이와 같이 Exception이 발생합니다. 즉, Flowable에서 제대로 배압을 제어하지 못하는 경우에 MissingBackpressureException이 발생할 수 있는 것입니다. 이러한 이유로 Flowable에서는 배압 전략을 명시하여 배압을 제어할 수 있습니다. 종류는 다음과 같습니다.

  • BackpressureStrategy.MISSING
    • 기본적으로 배압 전략을 구현하지 않습니다.
    • 오버플로를 다루려면 배압 제어 연산자를 사용해야 합니다.
  • BackpressureStrategy.ERROR
    • 소비자가 생산자를 따라가지 못하는 경우 MissiongBackpressureException 예외를 발생시킵니다.
  • BackpressureStrategy.BUFFER
    • 구독자가 아이템을 소비할 때까지 발행한 아이템들을 버퍼에 넣어 둡니다.
    • 이 버퍼는 제한이 없는 큐지만, 가용 메모리를 벗어나는 경우 OOME를 발생시킬 수 있습니다.
  • BackpressureStrategy.DROP
    • 구독자가 아이템을 소비하느라 바빠서 생산자를 못 따라가는 경우 발행된 아이템을 모두 무시하고 버립니다.
  • BackpressureStrategy.LATEST
    • 구독자가 아이템을 받을 준비가 될 때까지 가장 최신의 발행된 아이템들만 유지하고 이전 아이템은 버립니다.

Flowable 생성과 배압 전략

Flowable.create()는 Observable.create()와 비슷한데, Flowable의 경우 위에서 살펴 본 EmitterBackpressureStrategy(배압 전략)를 설정해야 합니다.

Flowable.create<Int>({ emitter ->
    for (i in 0..10000) {
        if (emitter.isCancelled) return@create
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribeBy(
        onNext = { item ->
            println(item)
        },
        onError = { throwable ->
            throwable.printStackTrace()
        }
    )

Thread.sleep(3000)

/*
1
2
3
...
9999
10000
 */

배압 제어 연산자

이미 만들어진 Flowable에 대해 배압에 대한 전략을 설정할 수 있도록 도와주는 배압 제어 연산자를 적용할 수 있습니다.


onBackPressureBuffer 연산자

  • onBackPressureBuffer 연산자를 사용하면 배압 구현이 되지 않은 Flowable에 대해 BackpressureStrategy.BUFFER를 적용합니다.

  • 마블 다이어그램을 보면 Observable은 다양한 색의 데이터들을 발행하였습니다. 하지만 Observer가 소비하는 속도가 이를 따라가지 못하자 이를 Buffer에 담아두고 요청할 때마다 하나씩 데이터를 방출하고 있습니다.

  • onBackPressureBuffer 연산자는 다양한 매개 변수(용량, 지연, 오버플로 콜백)를 받을 수 있도록 오버로딩되어 있습니다. 아래의 예제에서는 아무런 인자도 넘기지 않는 기본 연산자를 알아보도록 하겠습니다.

Flowable.interval(10, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer()
    .observeOn(Schedulers.io())
    .map { item ->
        Thread.sleep(2000)
        println("아이템 발행 : $item")
        item
    }
    .subscribeBy(
        onNext = { item ->
            println("아이템 소비 : $item")
        },
        onError = { throwable ->
            throwable.printStackTrace()
        }
    )

Thread.sleep(30 * 1000)

/*
아이템 발행 : 0
아이템 소비 : 0
아이템 발행 : 1
아이템 소비 : 1
아이템 발행 : 2
아이템 소비 : 2
 */

onBackpressureLatest 연산자

  • onBackpressureLatest 연산자는 스트림 버퍼가 가득 차면 최신의 아이템을 버퍼에 유지하려고 오래된 아이템을 버리는 연산자입니다.

  • 마블 다이어그램을 보면 이미 버퍼가 다 찬 상태에서 1~5의 값이 들어왔을 때, 모든 아이템을 버리고 가장 최근의 아이템인 5값을 가진 데이터가 발행된 것을 확인할 수 있습니다.

Flowable.interval(10, TimeUnit.MILLISECONDS)
    .onBackpressureLatest()
    .observeOn(Schedulers.io())
    .subscribeBy(
        onNext = { item ->
            Thread.sleep(100)
            println("아이템 소비 : $item")
        },
        onError = { throwable ->
            throwable.printStackTrace()
        }
    )

Thread.sleep(30 * 1000)

/*
아이템 소비 : 0
아이템 소비 : 1
아이템 소비 : 2
...
아이템 소비 : 125
아이템 소비 : 126
아이템 소비 : 127
아이템 소비 : 983
아이템 소비 : 984
...
 */
  • 예제 코드를 보면 1부터 127까지의 값을 소비하다가 갑자기 983으로 뛰는 것을 확인할 수 있습니다. 그 이유는 소비가 너무 느려서 128개의 데이터에 대해 버퍼링을 진행했고, 구독자가 이 128개의 데이터를 모두 소비한 후에는 무시된 값들 중 가장 최신의 값인 983을 방출한 것입니다.

onBackpressureDrop 연산자

  • onBackpressureDrop 연산자는 버퍼가 가득 찬 상태에서 버퍼에 든 아이템을 소비하는 쪽이 바쁘다면 발행된 아이템을 버립니다.

  • 마블 다이어그램을 보면 빨간색 1이 들어와서 방출한 후, (2,3,4,5)가 들어왔지만 이미 버퍼가 가득 찼기에 모두 버리는 모습을 확인할 수 있습니다. 그 후 계속해서 같은 과정이 반복되는 것입니다.

Flowable.range(1, 300)
    .onBackpressureDrop { item ->
        println("아이템 버림 : $item")
    }
    .observeOn(Schedulers.io())
    .subscribeBy(
        onNext = { item ->
            Thread.sleep(10)
            println("아이템 소비 : $item")
        },
        onError = { throwable ->
            throwable.printStackTrace()
        }
    )
Thread.sleep(30 * 1000)

/*
결과
아이템 버림 : 129
아이템 버림 : 130
아이템 버림 : 131
...
아이템 소비 : 1
아이템 소비 : 2
아이템 소비 : 3
...
아이템 소비 : 127
아이템 소비 : 128
 */
  • 예제 코드를 보면 128 이후부터는 출력되지 않고 모두 아이템 버림으로 출력된 것을 확인할 수 있습니다. 그 이유는 128개가 버퍼에 모두 가득찬 후, Observer가 이를 소비하고 있을 때 129부터 300까지 모든 아이템이 버려졌기 때문입니다.

참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23에 작성되었습니다.

아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs

profile
되새기기 위해 기록
post-custom-banner

0개의 댓글