RX JAVA - Flowable/Observable

Jang Seok Woo·2021년 7월 7일
0

실무

목록 보기
25/136

RxJava에는 Backpressure라는 개념과 이를 처리하는 Flowable class가 존재한다.
Backpressure가 무엇이고 Flowable은 어떻게 쓰는 것인지 알아보자.

배압(Backpressure)

배압이란 데이터 생산과 소비가 불균형적일 때 일어나는 현상이다. 만약 10,000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 된다. Observable이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것이다. 이는 결국 메모리가 overflow되고 OutOfMemoryError로 이어져 앱이 터질 것이다. 이러한 현상을 배압(Backpressure)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공한다.

Flowable

기존의 Observable이 배압 현상을 제어하지 못하는 반면, Flowable은 배압 현상을 스스로 제어할 수 있다. 다음의 두 코드를 살펴보자.

Observable

Observable.range(1, 10000)
        .doOnNext(integer -> System.out.println("Emit Data : "+integer))
        .observeOn(Schedulers.io())
        .subscribe(integer -> {
            System.out.println("Consume Data : "+integer);
            Thread.sleep(100);
        });
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 9998
Emit Data : 9999
Emit Data : 10000
Consume Data : 2
Consume Data : 3
Consume Data : 4
...

Flowable

Flowable.range(1, 10000)
        .doOnNext(integer -> System.out.println("Emit Data : "+integer))
        .observeOn(Schedulers.io())
        .subscribe(integer -> {
            System.out.println("Consume Data : "+integer);
            Thread.sleep(100);
        });
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 126
Emit Data : 127
Emit Data : 128
Consume Data : 2
Consume Data : 3
Consume Data : 4
...

두 예제 모두 10,000개의 데이터를 발행하면서, 소비는 100ms의 delay를 두었다.
Observable을 사용한 경우에는 데이터 발행과 소비가 균형적으로 일어나지 않으며 데이터는 소비와 상관없이 스트림에 계속 쌓이게 된다.
반면 Flowable을 사용한 경우에는 데이터가 일정량 누적되면 데이터를 더이상 발행하지 않는 것을 확인할 수 있다.
이와 같이, Flowable은 스트림에 끊임없이 쌓이는 데이터의 양을 제어할 수 있는 Observable의 또다른 형태이다.

When to use Observable? When to use Flowable?

Observable을 사용해야하는 경우
-1,000개 미만의 데이터 흐름이 발생하는 경우
-적은 데이터 소스만을 활용하여 OutOfMemoryException이 발생할 확률이 적은 경우
-마우스 이벤트나 터치 이벤트와 같은 GUI 프로그래밍을 하는 경우 (초당 1,000회 이하의 이벤-트는 Observable의 sample()이나 debounce()로 핸들링 가능)
-동기적인 프로그래밍이 필요하지만 플랫폼에서 Java Streams을 지원하지 않는 경우

Flowable을 사용해야하는 경우
-10,000개 이상의 데이터 흐름이 발생하는 경우
-디스크에서 파일을 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
-JDBC에서 데이터베이스를 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
-네트워크 IO 실행 시
-Blocking/Pull-based 방식을 사용하고 있는데 나중에 Non-Blocking 방식의 Reactive API/드라이버에서 데이터를 가져올 일이 있는 경우

OOME : Out Of Memory Exception(OOME)

예제

create() 연산자를 통해 Flowable을 생성하는 경우 배압 전략을 명시해주어야 한다.

Flowable.create(emitter -> {
    for (int i = 0; i < 10000; i++) emitter.onNext(i);
    emitter.onComplete();
}, BackpressureStrategy.DROP)
        .observeOn(Schedulers.io())
        .subscribe();

onBackPressureBuffer()

BackpressureStrategy.BUFFER 전략을 적용한다. 매개변수로 버퍼의 용량, 버퍼 overflow 발생 시의 동작 등을 함께 전달할 수 있다.

아래 그림은 리퀘스트 시점과 상관없이 이미 요청받은 데이터를 버퍼에 담아두고 하나씩 차례로 처리한다.
1,2,3,4,5 붉은 데이터를 처음에 받았고 그대로 시간이 지나도 하나씩 처리하고 있다.

onBackPressureDrop()

BackpressureStrategy.DROP 전략을 적용한다. 매개변수로 데이터를 버릴 때의 동작을 정의할 수 있다.

데이터 소비속도가 발행속도를 못따라갈 경우 발행된 데이터를 버린다.
첫 리퀘스트가 온 시점에 2,3,4,5 붉은 데이터를 버리고 새로 요청 받은 노란색 1번을 처리한다.

onBackPressureLatest()

BackpressureStrategy.LATEST 전략을 적용한다.

리퀘스트 받은 시점에 가장 최근에 요청받았던 데이터를 제외하고 모두 버린다.

예제

Flowable.range(1, 1000)
        .onBackpressureLatest()
        .doOnNext(integer -> System.out.println("Emit Data : "+integer))
        .observeOn(Schedulers.io())
        .subscribe(integer -> {
            System.out.println("Consume Data : "+integer);
            Thread.sleep(100);
        });
Thread.sleep(100*1000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Consume Data : 1
Emit Data : 5
...
Emit Data : 128
Consume Data : 2
Consume Data : 3
...
Consume Data : 95
Consume Data : 96
Emit Data : 1000
Consume Data : 97
Consume Data : 98
...
Consume Data : 128
Consume Data : 1000
profile
https://github.com/jsw4215

0개의 댓글