RxJava Observable(Backpressure) & Flowable

heetaeheo·2022년 8월 30일
0
post-thumbnail

Observable의 경우 생산자 역할을 하고, Observer는 소비자 역할을 합니다.

Observable에서는 생산과 소비가 각각 따로 동작하기 때문에 만약 Observer의 데이터 처리속도가 느린 경우 생산은 대량으로 진행되고 소비는 천천히 일어나는 상황을 마딱드릴 수 있습니다.

따라서 생산되는 데이터를 조절하기 위해 사용하는 채널을 backpressure라고 하며 Flowable은 backpressure를 지원합니다.

Backpressure(배압) 란?

배압은 데이터의 발행과 소비의 균형이 어긋날 때 발생하는 현상을 말합니다. 만약 10,000개의 데이터를 0.1초 마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 된다. Observable이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것입니다. 이는 결국 메모리가 overflow되고 OutOfMemoryError로 이어져 앱이 터질 것입니다. 이러한 현상을 Backpressure(배압)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공해줍니다.

Flowable

기존의 Observable이 배압 현상을 제어하지 못하는 반면, Flowable은 배압 현상을 스스로 제어할 수 있습니다. 코드로 살펴보겠습니다.

Observable

Observable.range(1, 10000)
	.doOnNext(integer -> System.out.println("Emit Data : "+ integer))
    .observeOn(Schedulers.io())
    .subscribe(integer -> {
    	System.out.println("Consume Data : "+integer);
        Thread.sleep(100);
        });
Thread.sleep(100*10000);

[실행결과]

Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 9998
Emit Data : 9999
Emit Data : 10000
Consume Data : 2
Consume Data : 3
Consume Data : 4
...

Flowable

Flowable.range(1, 10000)
	.doOnNext(integer -> System.out.println("Emit Data : "+integer))
    .observeOn(Schedulers.io())
    .subscribe(integer -> {
    	System.out.println("Consume Data : "+integer);
        Thread.sleep(100);
    });
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 126
Emit Data : 127
Emit Data : 128
Consume Data : 2
Consume Data : 3
Consume Data : 4
...

두 예제 모두 10,000개의 데이터를 발행하면서, 소비는 100ms의 delay를 두었습니다.

Observable을 사용한 경우에는 데이터 발행과 소비가 균형적으로 일어나지 않으며 데이터는 소비와 상관없이 스트림에 계속 쌓이게 됩니다.

반면 Flowable을 사용한 경우에는 데이터가 일정량 누적되면 데이터를 더 이상 발행하지 않는 것을 확인할 수 있습니다.

이와 같이 Flowable은 스트림에 끊임없이 쌓이는 데이터의 양을 제어할 수 있는 Observable의 또 다른 형태입니다.

그렇다면 이제 Flowable을 쓰면 되는건가?

정답은 아닙니다.
공식 문서에서는 목적에 맞게 적절히 선택하여 활용하라고 되어있습니다.

웬만하면 Observable 사용하라

  • 1,000개 미만의 데이터가 발행되는 경우 (즉 데이터 양이 적어 OutOfMemoryException 발생 확률이 거의 없는 경우)
  • GUI 프로그래밍 시(마우스 이벤트, 터치 이벤트 등 -> debounce() 등으로 제어 가능할 때
  • Java Stream API를 사용하지 않는 경우

Flowable을 사용하는 것보다 Observable을 사용하는 것이 오버헤드가 적기 때문에, 만약 Observable을 사용해도 되는 조건이라면 무조건 기본적으로 Observable을 사용하라고 권장합니다.

이럴 경우에 Flowable 사용하라

  • 데이터가 10,000개 이상 처리되는 경우
  • 디스크에서 파일을 읽어들이는 경우
  • JDBC를 통해 DB 쿼리 결과를 가져오는 경우
  • 네트워크 I/O 작업을 하는 경우
  • 다수의, 혹은 Non-Blocking 방식의 API를 요청하는 경우

즉, 한정적인 실행환경에서 방대한 작업량을 문제없이 수행해야 할 때 Flowable을 사용하는 것입니다.

배압 전략

Flowable에도 배압을 제어하지 못해 MissingBackpressureException이 발생할 수 있는 예외상황이 존재합니다. 따라서 Flowable에 배압 전략을 명시함으로써 배압을 제어할 수 있습니다.

  • MISSING(BackpressureStrategy.MISSING) : 배압 전략을 구현하지 않음

  • ERROR(BackpressureStrategy.ERROR) :
    소비 속도가 발행 속도를 따라가지 못하는 경우 MissingBackpressureException 발생

  • BUFFER(BackpressureStrategy.BUFFER) :
    데이터를 소비할 때까지 데이터를 버퍼에 넣어둠, 무한한 크기의 큐이지만 OOME이 발생할 수 있음

  • DROP(BackpressureStrategy.DROP) :
    소비 속도가 발행 속도를 따라가지 못하는 경우 발행된 데이터를 모두 버림

  • LATEST(BackpressureStrategy.LATEST) : 구독자가 데이터를 받을 준비가 될 때까지 최신 데이터만 유지하고 나머지는 버림

references

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#observable-and-flowable

https://velog.io/@haero_kim/RxJava-%EB%B0%B0%EC%95%95-%ED%98%84%EC%83%81%EA%B3%BC-Flowable

https://tourspace.tistory.com/283

https://4z7l.github.io/2020/12/23/rxjava-7.html

0개의 댓글