[RxJava] RxJava Observable의 정의, 형태, 생성

minnie·2022년 2월 17일
0

RxJava

목록 보기
2/8
post-thumbnail

📝RxJava의 학습 순서
1. Observable 클래스를 명확하게 이해(특히 Hot Observable과 Cold Observable의 개념을 꼭 이해해야함)
2. 변환, 제어, 결합 연산자 등 카테고리별 주요 함수 공부3. 스케줄러의 의미, subscribeOn()과 observeOn()함수의 차이
4. 그 밖의 디버깅, 흐름 제어함수


1. Observable이란?

Observable의 사전 뜻을 찾아보면 관찰할 수 있는, 식별 가능한 이러한 의미를 가지고 있다.
즉, Observable은 데이터의 흐름에 맞게 알림을 보내 observable을 구독하는 Observer가 데이터를 사용할 수 있도록 한다.
이러한 패턴을 Observer Pattern이라고 하며, Reactive Programming은 이 패턴에 기반을 둔다.

Disposable
Disposable이란 사전적 정의로는 사용 후 버리게 되어있는, 일회용의 라는 의미로 1회용 리소스를 나타낸다.

  • void dispose() : 리소스를 삭제한다.
  • boolean isDisposed() : 이 리소스가 삭제된 경우 true를 리턴

만약 Observable이 발행하는 아이템 개수가 정해져 있다면 모두 발행된 후 onComplete()가 호출되고 안전하게 종료될 것이다. 하지만 아이템을 무한 발행하거나 오래 실행되는 Observable의 경우, 제대로 종료하지 않으면 메모리 누수가 발생할 수 있다. 더 이상 Observable의 구독이 필요없을 때는 이를 폐기(dispose)하는 게 효율적이다. Disposable.dispose()를 호출해 언제든 아이템 발행을 중단할 수 있다.

🧶 Observable 동작 순서

  1. Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
  2. 데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받는다.
  3. Observer는 수신한 데이터를 가지고 어떠한 일을 한다.

Observable이 데이터를 발행하고 알림(Event)를 보내면 Observer는 Observable을 구독(Subscribe)해 데이터를 소비(Consume)한다.

🧶 Observable Event

Observable이 데이터를 발행 한 후 보내는 알림에는 3가지 Event를 사용하여 동작한다.
Emitter라는 인터페이스에 의해 아래 이벤트들이 선언된다.

  • onNext : 한 번에 하나씩 순차적으로 데이터를 발행한다.
  • onComplete : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext()를 더 호출하지 않음을 나타낸다.
  • onError : 오류가 발생했음을 Observer에 전달한다. onError 발생 이후에는 onNext와 onComplete가 발생하지 않는다

데이터나 오류 내용을 발행 할 때 null은 발행할 수 없다.
만약 아무런 데이터를 발행하지 않는 빈 Observable을 만들고 싶다면 Observable.empty() 연산자를 사용하면 된다.

🧶 Observable Subscribe

구독(Subscribe)란 단순하게 수신한 데이터를 가지고 할 행동을 정의하는 것이다.
Observer는 subscribe()메소드에서 수신한 각각의 알림에 대해 실행할 내용을 지정한다.

Disposable class는 구독의 정상적인 해지를 돕는다.
onComplete 이벤트가 발생하면 dispose()를 호출해 Observable이 더 이상 데이터를 발행하지 않도록 구독을 해지한다. 또한 isDisposed()를 통해 구독이 해지되었는지 확인할 수 있다.


2. Observable 종류

다양한 스트림을 처리하기에 Observable 하나만으로는 부족하다는 의견이 있어서 Rx2에서는 다양한 데이터 스트림에 따라 사용할 수 있는 Observable들이 늘어나게 되었다.
Observable 스트림 외에도 Single, Maybe, Completable,Flowable 처럼 특별한 스트림이 있다.

2-1.Single

Single은 단 하나의 아이템만을 발행할 수 있다. create()를 사용하는 경우 Emitter를 사용해 데이터를 발행한다. 데이터를 한 번만 발행하기 때문에 onNext(), onComplete() 대신 onSuccess()를 사용해 데이터 발행이 완료됨을 알려준다. 오류 처리는 Observable의 Emitter와 동일하게 onError()를 이용해 구독자에게 알려준다. 주로 한 번의 데이터만 발행하는 API 통신, Client 요청에 대응하는 서버의 응답에 많이 사용한다.

// Single.just()
Single.just("Single Test - Single.just()")
        .subscribe(System.out::println);

// Single.create()
Single<String> createdSingle = Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull SingleEmitter<String> emitter) {
        emitter.onSuccess("Single Test - Single.create()");
    }
});
createdSingle.subscribe(System.out::println);
Single Test - Single.just()
	Single Test - Single.create()

Observable을 Single로 변환할 수 있다. 단, 여러 데이터를 발행하는 Observable을 Single로 변환하면 에러가 발생할수 있다.

// Observable → Single 변환
// fromObservable() 사용
Observable<String> observable = Observable.just("Single Test : from.observable()");
Single.fromObservable(observable)
        .subscribe(System.out::println);

// Observable → Single 변환
// single() 사용
Observable.just("Single Test : just()")
        .single("Single Test : default Value") //default value
        .subscribe(System.out::println);

2-2.Maybe

Maybe는 Single과 비슷하게 최대 하나의 데이터를 가질 수 있지만, 아이템을 발행하거나 발행하지 않을 수도 있다는 점에서 차이가 있다. (Single은 1개 완료, Maybe는 0 or 1개 완료) 그래서 아이템을 발행했을 때에는 onSuccess()를 호출하고, 발행하지 않을 때에는 onComplete()를 호출한다. onSuccess()이후에 다시 onComplete()를 호출할 필요는 없다.

Maybe.just("Hello World")
        .delay(10, TimeUnit.SECONDS, Schedulers.io())
        .subscribeWith(new DisposableMaybeObserver<String>() {
            @Override public void onSuccess(String value) {
                // 성공하여 값을 발생시켰음
                System.out.println("Maybe Test >>> Success: " + value);
            }
            @Override public void onError(Throwable error) {
                // 에러
                error.printStackTrace();
            }
            @Override public void onComplete() {
                // 성공했지만 값이 없을 때
                System.out.println("Maybe Test >>> complete : Null");
            }
        });

2-3.Completable

Completable은 데이터를 발행하는 Observable, Single, Maybe와 달리 아이템을 발행하지 않고, 정상적으로 실행이 종료되었는지에 대해 확인할 때 사용한다. 아이템 발행을 하지 않기 때문에 onNext(), onSuccess()는 쓰지 않고 onComplete()와 onError()만을 사용한다.
Room 데이터 베이스 사용에서와 같이 백그라운드에서 동작해야 되는 함수가 필요한 경우, 서버에 객체를 업데이트하는 요청을 보낸 경우 사용할 수 있다.

// Completable
Completable.create(emitter ->{
    emitter.onComplete();
}).subscribe(()->System.out.println("Completable Test >>> completed 1"));

Completable.fromRunnable(() ->{
}).subscribe(() -> System.out.println("Completable Test >>> completed 2"));

2-4.Flowable

Flowable은 연속적으로 데이터가 흐르는 스트림을 생성한다. 데이터가 흐름에 따라 화면을 갱신시키는 코드에 사용될 수 있다. 통신, DB, 대량의 데이터 처리에 Flowable을 사용하는 것을권장한다. 0개~N개의 데이터를 발행할 수 있다는 점에서 Observable과 유사하지만, Observable은 Backpressure이 없는 반면에 Flowable은 Backpressure가 존재한다.

BackPressure란 배압이라는 뜻으로 데이터 생산과 소비가 불균형적일 때 일어나는 현상이다.
만약 10,000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 된다.

Observable이 데이터를 발행하는 속도를 Observable의 소비 속도가 따라가지 못하는 것이다.
이는 결국 메모리가 overflow되고 OutOfMemoryError로 이어져 앱이 터질 것이다.
이러한 현상을 배압(Backpressure)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공한다.

🎈 Observable을 사용해야하는 경우

  • 1,000개 미만의 데이터 흐름이 발생하는 경우
  • 적은 데이터 소스만을 활용하여 OutOfMemoryException이 발생할 확률이 적은 경우
  • 마우스 이벤트나 터치 이벤트와 같은 GUI 프로그래밍을 하는 경우 (초당 1,000회 이하의 이벤트는 Observable의 sample()이나 debounce()로 핸들링 가능)
  • 동기적인 프로그래밍이 필요하지만 플랫폼에서 Java Streams을 지원하지 않는 경우

🎈Flowable을 사용해야하는 경우

  • 10,000개 이상의 데이터 흐름이 발생하는 경우
  • 디스크에서 파일을 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
  • JDBC에서 데이터베이스를 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
  • 네트워크 IO 실행 시
  • Blocking/Pull-based 방식을 사용하고 있는데 나중에 Non-Blocking 방식의 Reactive API/드라이버에서 데이터를 가져올 일이 있는 경우

3. Observable 생성 연산자

RxJava에서는 연산자(Operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있다.

3-1.create()

create를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템 발행의 완료나 오류(Complete/Error)의 알림을 직접 설정할 수 있다.
해당 Observable을 구독하기 위해서 subscribe()를 호출해서 Observer나 Consumer를 추가해준다. 그리고 아이템의 발행이 끝났다면 반드시 onComplete()를 호출해야 한다.
onComplete가 호출된 후라면 아이템이 더 발행되더라도 구독자는 데이터를 받지 못한다.

private void createObservable(){
	source = Observable.create(emitter -> {
         emitter.onNext("Hello");
         emitter.onNext("Hi");
         emitter.onComplete();
    });
    source.subscribe(System.out::println);
}

실제로 create()연산자는 개발자가 직접 Emitter를 제어하기 때문에 주의해서 사용해야 한다.
예를 들어 Observable을 더 이상 사용하지 않을 때에는 등록된 Callback을 모두 해제하지 않으면 메모리 릭이 발생하고, BackPressure를 직접처리해야한다.

3-2. just()

just()는 해당 아이템을 그대로 발행하는 Observable을 생성해준다. just()연산자의 인자로 넣은 아이템을 차례로 발행하며, 한 개의 아이템을 넣을 수도 있고, 타입이 같은 여러 아이템을 넣을 수도 있다.

private void justObservable(){
	Observable<String> source = Observable.just("Hello","Hi");
    source.subscribe(System.out::println);
}

3-3. fromXXX()

여러 데이터를 다뤄야 하는 경우 사용한다. 정의된 메소드의 종류는 다음과 같으며 특정 타입의 데이터를 Observable로 바꿔주는 메소드이다. Array, Iterable, Single, Maybe...등 다양한 특정 타입의 데이터로 바꿀 수 있다.

3-4. 그 외

이 외에도 다른 생성 연산자들이 많다.

Method설명
Interval시간 간격을 두고 데이터를 발행하는 Observable 생성
Range특정 범위 내 Integer 형태의 아이템을 발행하는 Observable 생성
Repeat아이템을 지정한 횟수만큼, 혹은 무한히 반복하여 발행
Start연산 후 특정 값을 반환, 함수처럼 작용함
Timer지정한 시간 delay 이후 아이템 발행

참고 :https://blog.yena.io/studynote/2020/10/23/Android-RxJava(2).html
https://4z7l.github.io/2020/12/23/rxjava-7.html
https://velog.io/@ryalya/Android-RxJava-4-Single-Maybe-Completable

profile
Android Developer

0개의 댓글