[RxSwift] Subject - 2

강대훈·2025년 3월 19일

RxSwift

목록 보기
5/5
post-thumbnail

이전 Subject포스팅에서 Subject는 Observable, Observer 역할을 모두 수행할 수 있다고 했다.

이번에는 과연 어떻게 Subject가 Observer, Observable 역할을 할 수 있는지 알아보고, 어떻게 실행되는지 한 번 파헤쳐보는 시간을 가져보려고 한다.

Observable & Observer 채택

우선 Subject중의 하나인 PublishSubject를 살펴보자.

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType 

여러 프로토콜을 채택한 것을 확인할 수 있는데, 그 중에서 ObservableObserverType을 먼저 확인해보자.

public class Observable<Element> : ObservableType

Observable을 확인해보니 ObservableType이라는 프로토콜 또한 채택하고 있는 것을 확인할 수 있다.

그럼 또 ObservableType을 확인해보자.

public protocol ObservableType: ObservableConvertibleType {
	func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

ObservableType은 다음과 같이 subscribe() 메소드를 채택하라는 프로토콜임을 확인할 수 있다.

그렇다면 다시 PublishSubject로 돌아와서 subscribe() 메소드가 존재하는지 확인해보자!

// MARK: - PublishSubject
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self.lock.performLocked { self.synchronized_subscribe(observer) }
    }

역시 존재하고 있다는 것을 확인할 수 있다.

하지만 사실 PublishSubject에 subscribe() 메소드가 존재하지 않아도 여전히 Observable을 채택할 수 있다.

그 이유는 Observable에도 subscribe() 메소드가 존재하기 때문이다.

// MARK: - Observable<Element>
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }

다음 ObserverType을 확인해보자면 다음과 같이 나온다.

public protocol ObserverType {
    associatedtype Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}

/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    
    public func onCompleted() {
        self.on(.completed)
    }
    
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

on() 메소드를 반드시 채택해야 한다는 것을 확인할 수 있다. 다시 PublishSubject를 확인해보면

public func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        dispatch(self.synchronized_on(event), event)
    }

다음과 같이 on() 메소드가 존재하는 것을 확인할 수 있다.

이렇게 Observable과 Observer타입을 채택함을 통해서 Subject가 두개의 역할을 다 할 수 있다는 것을 직접 확인할 수 있었다.

하지만 그럼에도 Subject가 Observable과 Observer와 어떤 차이점을 가지는지 이해하기에는 쉽지 않다.

그래서 이번에는 Subject가 어떻게 구독이 진행되는지 한 번 확인해보려고 한다.

먼저 Subject에 구독을 진행하는 시나리오를 따라가보자.

Subject의 구독 시나리오

let publishSubject = PublishSubject<Int>()

publishSubject.subscribe { event in 
	print(event.element!)
}

굉장히 간단한 구독 과정이다. 내부에서는 어떤 일이 일어나는지 확인해보자.

// MARK: - ObservableType
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
    let observer = AnonymousObserver { e in
        on(e)
    }
    return self.asObservable().subscribe(observer) 
}

해당 subscribe() 메소드는 PublishSubject가 채택하고 있는 ObservableType의 메소드다. 확인해보면 AnonymousObserver로 observer가 생성되는 것을 확인할 수 있다.

그런데 asObservable() 메소드는 어떤 메소드일까?

//MARK: - Observable<Element>
public func asObservable() -> Observable<Element> { self }

PublishSubject가 채택하고 있는 Observable의 메소드임을 확인할 수 있다. 결국에는 PublishSubject에 있는 subscribe() 메소드가 실행된다는 것을 알 수 있다.

// MARK: - PublishSubject
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    self.lock.performLocked { self.synchronized_subscribe(observer) }
}

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if let stoppedEvent = self.stoppedEvent {
        observer.on(stoppedEvent)
        return Disposables.create()
    }
        
    if self.isDisposed {
        observer.on(.error(RxError.disposed(object: self)))
        return Disposables.create()
    }
        
    let key = self.observers.insert(observer.on)
    return SubscriptionDisposable(owner: self, key: key)
}

subscribe() 메소드에서는 synchronized_subscribe() 메소드를 실행시키고 있는데, 여기서 가장 중요한 코드는 self.observers.insert(observer.on)이다.

저 구문을 통해서 Subject가 AnonymousObserver들을 관리하고 있다는 것을 알 수 있는데, observer.on 메소드를 매개변수로 전달하고 있다.

해당 과정을 통해서 Subject가 AnonymousObserver들을 observers라는 프로퍼티에 따로 저장을 하고 있다는 것을 확인할 수 있다.

정확히 말하면 AnonymousObserver에 등록해둔 이벤트 클로저들을 저장하고 있다는 것이다.

결론은 observers에 AnonymousObserver들을 보관하고 있다는 것을 알 수 있다.

Subject의 데이터 방출 시나리오

그렇다면 데이터를 방출하는 onNext() 메소드에 대해서도 확인해보자.

publishSubject.onNext(1)

다음과 같이 1이라는 값의 데이터를 방출하면 PublishSubject가 채택하고 있는 ObserverType의 Extension에 정의되어 있는 onNext()가 호출된다.

// MARK: - ObserverType
public func onNext(_ element: Element) {
    self.on(.next(element))
}

self.on(.next(element)) 가 호출되면서 PublishSubject에 정의되어 있는 on() 메소드가 호출된다.

public func on(_ event: Event<Element>) {
    #if DEBUG
        self.synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { self.synchronizationTracker.unregister() }
    #endif
    dispatch(self.synchronized_on(event), event)
}

디버깅 하는 코드영역을 제외하면 dispatch() 메소드 안에서 synchronized_on(event) 메소드가 먼저 실행되는 것을 볼 수 있는데,

func synchronized_on(_ event: Event<Element>) -> Observers {
        self.lock.lock(); defer { self.lock.unlock() }
        switch event {
        case .next:
            if self.isDisposed || self.stopped {
                return Observers()
            }
            
            // MARK: - self.observers 호출!!
            return self.observers
        case .completed, .error:
            if self.stoppedEvent == nil {
                self.stoppedEvent = event
                self.stopped = true
                let observers = self.observers
                self.observers.removeAll()
                return observers
            }

            return Observers()
        }
    }

해당 메소드에서 확인해보면 self.observers 을 리턴하는 것을 볼 수 있다. 아까 우리는 구독 과정에서 self.observers.insert() 메소드를 통해서 AnonymousObserver를 보관하고 있다는 것을 알고있다.

즉 이 메소드는 AnonymousObserver들을 반환하고 있다는 것을 알 수 있다.

이제 dispatch() 메소드를 확인해보자.

func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
    	// MARK: - AnonymousObserver 실행
        for element in dictionary.values {
            element(event)
        }
    }
}
for element in dictionary.values {
    element(event)
}

해당 구문을 통해서 AnonymousObserver들의 이벤트핸들러, 즉 클로저가 실행되는 것을 확인할 수 있다.

해당 과정을 통해서 어떻게 Subject가 Observable과 Observer역할을 둘 다 할 수 있는지에 대해서 확인해볼 수 있었다.

결론?

결국 Subject의 Observer역할은 AnonymousObserver들에게 이벤트를 방출하는 역할을 가지고 있다는 것을 확인할 수 있었고, Observable의 역할은 AnonymousObserver들을 관리하는 역할을 하고 있다는 것을 확인할 수 있었다.

글로 쓰다보니 어떤 흐름으로 이어지는지 적으려니 매우 쉽지 않다는 것을 느꼈다. 그만큼 내가 해당 로직에 대해 매우 정확하게 이해하고 있지는 않다는 것을 알게되었다.

머릿속으로도 다시 구독 및 방출에 대한 흐름을 정리해보고 코드를 좀 더 뜯어보는 시간을 가져야 할 것 같다.

0개의 댓글