Rx Observable -2-

cloud0·2021년 9월 10일
0
post-thumbnail

Observable이란


ReactiveX에서 옵저버는 Observable을 구독한다. Observable이 배출하는 하나 또는 연속된 항목에 옵저버는 반응한다. 이러한 패턴은 동시성 연산을 가능하게 한다. 그 이유는 Observable이 객체(데이터) 를 배출할 때까지 기다릴 필요 없이 어떤 객체(데이터) 가 배출되면 그 시점을 감시하는 관찰자를 옵저버 안에 두고 그 관찰자를 통해 배출 알림을 받으면 되기 때문이다.

이 패턴을 Observer Pattern 이라고 하며, Reactive Programming은 이 Observer Pattern에 기반을 둔다.

Observable Marble diagram

ReactiveX는 기본적으로 비동기와 병렬로 메소드를 호출하지만, Reactive가 아닌 일반적인 메서드 호출은 대게 다음과 같은 흐름으로 진행된다.

  1. 메서드를 호출한다.
  2. 메서드가 리턴한 값을 변수에 저장한다.
  3. 결과 값을 가진 변수를 통해 필요한 연산을 처리한다.

또는, 이렇게 표현하기도 한다.

  1. 메서드를 호출하고, 리턴 값을 'returnVal'에 할당한다.
    returnVal = someMethod(itsParameters)
  1. returnVal을 통해 필요한 작업을 진행한다.

비동기 모델에서는 아래와 같은 흐름대로 코드가 실행된다.

1.비동기 메소드 호출로 결과를 리턴받고 필요한 동작을처리하는 메서드를 정의한다.(이 메서드는 옵저버의 일부가 된다.)
2. Observabel로 비동기 호출을 정의한다.
3. 구독을 통해 옵저버를 Observable 객체에 연결 시킨다.(또한, 동시에 Observable의 동작을 초기화 한다.)
4.필요한 코드를 계속 구현한다. 메소드 호출로 결과가 리턴될 떄마다 옵저버의 메서드는 리턴 값 또는 (Observable이 배출하는) 항목들을 사용해서 연산을 시작한다.

// 옵저버의 onNext 핸들러를 정의한다, 하지만 실행하지는 않는다
// (이 예제에서는, 단순히 옵저버에 onNext 핸들러만 구현한다.)
def myOnNext = {it -> /*필요한연산을 처리한다. */}
// Observable을 정의하지만, 역시 실행하지는 않는다
def myObservable = someObserbable(itsParameters)
//옵저버가 Observable을 구독한다. 그리고 Observable을 실행한다.
myObservable.subscribe(myOnNext)
//필요한 코드를 구현한다.

Observable 구조


1. Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
2. 데이터를 발행할 떄만다 구독하고 있는 모든 Observer가 알림을 받는다.
3. Observer는 수신한 데이터를 가지고 어떠한 일을 한다.

Subscribe


observable이 데이터를 발행하고 Event을 보내면 Observer는 Observable을 구독(Subscribe) 해 데이터를 소비(Consume) 한다. 실제로는 Observable이 데이터 흐름ㅇ르 정의하고 알림을 보낸 뒤 Observer가 Subscribe를 해야 데이터가 발행되고 소비된다.

Observable의 데이터 발행(onNext, onCompleted, onError)

Subscribe 메서드를 통해 옵저버와 Observable을 연결한다.

  • onNext
    데이터의 발행을 알림 이 메서드는 Observable이 배출하는 항목을 파라미터로 전달 받는다.
  • onError
    오류가 발생했을음 알림, onNextonCompleted는 더이상 호출이 되지 않는다. onError 메서드는 오류 정보를 저장하고 있는 객체를 파라미터로 전달 받는다.
  • onCompleted
    모든 데이터의 발행이 완료되었음을 알림, onNext를 호출한 후 이 메서드를 호출한다.(딱한번 발생 이후에 onNext가 발생하면 안됨)

Observer의 Subscribe

Subscribe이란 단순하게 수신한 데이터를 가지고 할 행동을 정의하는 것이다.

Disposable

dispose()는 Observable에게 더이상 데이터를 발행하지 않도록 구독을 해지하는 함수이다.

Observable 계약(Observable Contract)에 따르면 Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊는다.

따라서 onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()를 호출할 필요는 없다.

Observable의 특징


  • Observables Are Composable
    Java의 Futures는 비동기 실행을 도와준다. 하지만 여러 제약사항이 존재하여 적절한 흐름을 구성하기 어렵다. 이에 비해 Observable은 비동기 데이터의 적절한 흐름과 순서를 구성할 수 있다.
  • Observables Are Flexible
    Observable은 단순한 scalar값 뿐만 아니라 무한한 스트림을 차례로 발행할 수 있다. 또한 어떠한 유스케이스에서도 사용할 수 있다.
  • Observables Are Less Opinionated
    Observable은 데이터 소스가 Concurrency이던 Asynchronicity이던 상관이없다.Observable은 Thread-pool,Event Loop, Non-Blocking I/O등 다양한 방식을 통해 구현될 수 있다.

Observable 종류


RxJava2와 RxJava3에는 데이터 소스를 나타내는 5가지의 기본 클래스가 있다.

  • Observable : 가장 기본적인 형태,0개 ~ N개의 데이터 발행, BackPressure없음
  • Single : 단한개의 데이터, 혹은 오류 발행
  • Completable : 성공 혹은 실패했다는 결과만 발행
  • Maybe : 0개 또는 1개 완료,오류
  • Flowable :0개~N개의 데이터 발행,BackPressure존재

REFERENCE

profile
이...사...중......

0개의 댓글