[Rxswift] 2교시 - Subscribe에 대해서

어흥·2024년 6월 1일

Swift

목록 보기
18/28

복습

Rxswift는 비동기 프로그래밍을 관찰 가능한 순차적 형태와 함수 형태의 연산자를 통해 처리하게끔 도와줍니다.

  • 관찰 가능한 순차적 형태: Observable
  • 이벤트를 관찰하여 전파받는 역할: Observer

ReactiveX에서 observer는 observable 을 구독한다. observer는 observable이 방출하는 모든 아이템(들)에 대해 반응한다.

서론

  1. create : Observable 생성하기

    public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.Element>) 
    -> RxSwift.Disposable) -> RxSwift.Observable<Self.Element>
  2. subscribe

    observable.subscribe(onNext: { (element) in // 
            print(element)
      })
❓ 그럼 Observer은 어디에?

답은 subscribe 에 있습니다.

✏️ subscribe

Subscribe operate upon the emissions and notifications from an Observable

The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.

Subscribe는 Observable로부터 발생한 notification과 emission에 따라 동작한다.

Subscribe는 Observable에 Observer를 연결하는 접착제다. Observer가 Observable로 부터 방출된 아이템이나 에러, 완료 알림을 받기 위해서는 반드시 처음에 이 연산자를 Observable에 사용해 구독해야만 가능하다.

1️⃣ subscribe in [Observable.swift]

  • 파라미터로 Observer를 받고 있습니다.

But, 보통 사용하는 subscribe

  • 파라미터로 onNext, onError 클로저를 전달했었습니다.
_ = downloadJson(MEMBER_LIST_URL)
            .subscribe(onNext: {print($0)}, onError: {err in print(err)}, onCompleted: {print("com")})
⁉️ Observable.swift 파일에서는 `subscribe` 메서드 파라미터로 Observer를 전달했는데, 위에서 사용하는 메서드에서는 Observer가 어디있지?

2️⃣ subscribe in [ObservableType+Extensions.swift]

  • ObservableTypeObservable 클래스가 따르는 프로토콜
  • onNext, onError, onCompleted, onDisposed 를 파라미터로 받습니다. (클로저 형태)
  • 반환 타입은 Disposable입니다. Disposable 을 반환하는 이유는 Observable 이 관찰가능한 형태이기 때문에 생성해서 구독하게 되면 구독을 해제할 수 있어야 합니다. (즉, 더이상 Observable 에 대해서 이벤트를 받고 싶지 않을 때 사용)
public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            // *************  중요 코드 1 *************
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            // ************* 중요 코드 2  *************
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

1. Observer 자체적으로 생성

subscribe(onNext:...) 메서드 내부에서 AnonymousObserver를 자체적으로 생성해서 구독합니다.

let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }

AnonymousObserver 구성

  1. 생성자

    eventHandler 프로퍼티에 파라미터로 전달된 클로저( switch 문을 통해 event 처리 구현 내용 포함) 저장

  2. onCore 메서드

    eventHandler 실행

final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void
    
    private let eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self.eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        self.eventHandler(event)
    }
}

2. Observer에 대한 subscription

 self.asObservable().subscribe(observer) // 여기서 self는 Observable

1) asObservable

ObservableType을 Observable(정확히 말하면 AnonymousObservable)로 변환해주는 메서드

  • ObservableType → Observable
  • ObservableType의 extension에 정의되어 있습니다.
  • more asObservable하는 이유: 지금은 Observable이어서 상관없지만 ObservableType 프로토콜을 채택하고 Observable가 아닌 타입인 경우(예를 들어, ControlEvent) 문제가 발생하지 않고 정상적으로 Observable을 구독을할 수 있기 위해서
extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<Element> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        Observable.create { o in self.subscribe(o) }
    }
}

2) subscribe()

observer를 받아서 그에 대한 subscription을 반환한다는 것

public class Observable<Element> : ObservableType {
	public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

정리

 self.asObservable().subscribe(observer)
 
  return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )

observable.subscribe 할 때마다 만들어지는 observer에 대한 Disposable 타입의 subscription이다.

마무리

_ = downloadJson(MEMBER_LIST_URL)
            .subscribe(onNext: {print($0)}, onError: {err in print(err)}, onCompleted: {print("com")})

우리가 사용하는 subscribe(onNext: …)를 사용하면 파라미터로 클로저만 넘겨줘도 내부적으로 Observer를 생성하여 Observable을 구독하여 방출하는 item을 받을 수 있는 것입니다.

어렵습니다…

참고

ReactiveX - Subscribe operator
RxSwift subscribe 흐름 생성- 1
[RxSwift] subscribe의 동작원리 이해하기
[RxSwift] Observer
RxSwift) Observable, Observer, Subscribe 흐름 이해하기

0개의 댓글