Observable이란 observer에 구독되며, observer는 Observable이 emit하는 item이나 sequence of item에 react한다.
이러한 패턴은 동시성
을 향상시켜준다.
Observable
== Observable Sequence
== Sequence
Observable은 일정기간동안 이벤트를 생성하며, emit
한다.
Marble Diagram
으로 이해하는 방법이 쉽다.
/// A type-erased `ObservableType`.
///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> { self }
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
생성자 구현체
extension ObservableType {
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.Element>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.Element>
}
ReactiveX에서는 작성된코드 순서에 따라 실행이 되고 완료되지 않고, Observer에 의해 임의의 순서에 따라 병렬로 실행되며 결과는 나중에 연산된다.
즉 메서드를 호출하는 방식이 보다는 Observable 안에서 데이터를 조회하고 변환하는 메커니즘을 정의한 후, Observable이 이벤트를 발생시키는 순간 준비된 연산을 실행해 결과를 return하는 메커니즘으로 실행된다.
이는 전 글에서 살펴봤던 선언형 프로그래밍 패러다임에 부합한다.
next
이벤트를 방출할 수 있다.error
이벤트를 방출하여 스트림을 종료할 수 있다.complete
이벤트를 방출하여 스트림을 종료할 수 있다.이러한 이벤트 타입은 Enum으로 정의되어있다.
/// Represents a sequence event.
///
/// Sequence grammar:
/// **next\* (error | completed)**
@frozen public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
extension Event: CustomDebugStringConvertible {
/// Description of event.
public var debugDescription: String {
switch self {
case .next(let value):
return "next(\(value))"
case .error(let error):
return "error(\(error))"
case .completed:
return "completed"
}
}
}
.next
는 Observable이 배출하는 항목(Element)을 파라미터로 전달 받는다.
.error
는 기대하는 데이터가 생성되지 않았거나 다른 이유로 오류가 발생할 경우 Swift.Error 를 전달받는다.
onError가 호출된 경우 onNext나 onComplete가 더이상 호출되지 않는다.
.complete
는 오류가 호출되지 않았을 경우 마지막 onNext를 호출 한 뒤 이벤트를 종료시킨다.
Observable에 따라 event를 emit하는 시점이 다르다.
Hot Observable은 생성되자마자 event를 emit하기도 한다.
따라서 옵저버가 구독을 하지 않더라도 계속 event가 방출되고, 옵저버가 중간에 구독하면 배출되는 중간부터 Observable을 구독할 수 있다.
Cold Observable은 옵저버가 구독할 때까지 event를 방출하지 않는다.
옵저버가 구독을 시작했을 경오 event전체를 구독할 수 있도록 보장한다.
ReactiveX의 구현 코드 중에는 “연결 가능한(Connectable)” Observable이라고 불리는 Observable 객체가 존재하는데, 이 Observable은 옵저버의 구독 여부와는 상관 없이 자신의 Connect 메서드가 호출되기 전까지 항목들을 배출하지 않는다.
ReactiveX 구현체 중에는 Subscriber라는 특별한 옵저버 인터페이스가 있는데 이 인터페이스는 unsubscribe라는 메서드를 제공한다.
현재 구독 중인 Observable 중, 옵저버가 더 이상 구독을 원하지 않는 경우에는 이 메서드를 호출해서 구독을 해지할 수 있다.
만약 더 이상 관심있는 다른 옵저버가 존재하지 않는다면 Observable들은 새로운 항목들을 배출하지 않는다.
unsubscribe는 연산자 체인
을 통해 옵저버가 구독하고 있었던 Observable들이 더 이상 항목들을 배출하지 못하도록 체인 안에 연결된 링크들을 끊어 버린다.
옵저버블은 onCompleted나 onError가 발생하면 onDisposed가 호출되며 Disposable을 방출한다. 이 값은 DisposeBag에 담아두고 언제든 dispose시킬 수 있게 준비해둔다.