이전 Subject포스팅에서 Subject는 Observable, Observer 역할을 모두 수행할 수 있다고 했다.
이번에는 과연 어떻게 Subject가 Observer, Observable 역할을 할 수 있는지 알아보고, 어떻게 실행되는지 한 번 파헤쳐보는 시간을 가져보려고 한다.
우선 Subject중의 하나인 PublishSubject를 살펴보자.
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType
여러 프로토콜을 채택한 것을 확인할 수 있는데, 그 중에서 Observable과 ObserverType을 먼저 확인해보자.
public class Observable<Element> : ObservableType
Observable을 확인해보니 ObservableType이라는 프로토콜 또한 채택하고 있는 것을 확인할 수 있다.
그럼 또 ObservableType을 확인해보자.
public protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
ObservableType은 다음과 같이 subscribe() 메소드를 채택하라는 프로토콜임을 확인할 수 있다.
그렇다면 다시 PublishSubject로 돌아와서 subscribe() 메소드가 존재하는지 확인해보자!
// MARK: - PublishSubject
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
역시 존재하고 있다는 것을 확인할 수 있다.
하지만 사실 PublishSubject에 subscribe() 메소드가 존재하지 않아도 여전히 Observable을 채택할 수 있다.
그 이유는 Observable에도 subscribe() 메소드가 존재하기 때문이다.
// MARK: - Observable<Element>
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
다음 ObserverType을 확인해보자면 다음과 같이 나온다.
public protocol ObserverType {
associatedtype Element
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<Element>)
}
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
on() 메소드를 반드시 채택해야 한다는 것을 확인할 수 있다. 다시 PublishSubject를 확인해보면
public func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
dispatch(self.synchronized_on(event), event)
}
다음과 같이 on() 메소드가 존재하는 것을 확인할 수 있다.
이렇게 Observable과 Observer타입을 채택함을 통해서 Subject가 두개의 역할을 다 할 수 있다는 것을 직접 확인할 수 있었다.
하지만 그럼에도 Subject가 Observable과 Observer와 어떤 차이점을 가지는지 이해하기에는 쉽지 않다.
그래서 이번에는 Subject가 어떻게 구독이 진행되는지 한 번 확인해보려고 한다.
먼저 Subject에 구독을 진행하는 시나리오를 따라가보자.
let publishSubject = PublishSubject<Int>()
publishSubject.subscribe { event in
print(event.element!)
}
굉장히 간단한 구독 과정이다. 내부에서는 어떤 일이 일어나는지 확인해보자.
// MARK: - ObservableType
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
해당 subscribe() 메소드는 PublishSubject가 채택하고 있는 ObservableType의 메소드다. 확인해보면 AnonymousObserver로 observer가 생성되는 것을 확인할 수 있다.
그런데 asObservable() 메소드는 어떤 메소드일까?
//MARK: - Observable<Element>
public func asObservable() -> Observable<Element> { self }
PublishSubject가 채택하고 있는 Observable의 메소드임을 확인할 수 있다. 결국에는 PublishSubject에 있는 subscribe() 메소드가 실행된다는 것을 알 수 있다.
// MARK: - PublishSubject
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self.stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self.isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
subscribe() 메소드에서는 synchronized_subscribe() 메소드를 실행시키고 있는데, 여기서 가장 중요한 코드는 self.observers.insert(observer.on)이다.
저 구문을 통해서 Subject가 AnonymousObserver들을 관리하고 있다는 것을 알 수 있는데, observer.on 메소드를 매개변수로 전달하고 있다.
해당 과정을 통해서 Subject가 AnonymousObserver들을 observers라는 프로퍼티에 따로 저장을 하고 있다는 것을 확인할 수 있다.
정확히 말하면 AnonymousObserver에 등록해둔 이벤트 클로저들을 저장하고 있다는 것이다.
결론은 observers에 AnonymousObserver들을 보관하고 있다는 것을 알 수 있다.
그렇다면 데이터를 방출하는 onNext() 메소드에 대해서도 확인해보자.
publishSubject.onNext(1)
다음과 같이 1이라는 값의 데이터를 방출하면 PublishSubject가 채택하고 있는 ObserverType의 Extension에 정의되어 있는 onNext()가 호출된다.
// MARK: - ObserverType
public func onNext(_ element: Element) {
self.on(.next(element))
}
self.on(.next(element)) 가 호출되면서 PublishSubject에 정의되어 있는 on() 메소드가 호출된다.
public func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
dispatch(self.synchronized_on(event), event)
}
디버깅 하는 코드영역을 제외하면 dispatch() 메소드 안에서 synchronized_on(event) 메소드가 먼저 실행되는 것을 볼 수 있는데,
func synchronized_on(_ event: Event<Element>) -> Observers {
self.lock.lock(); defer { self.lock.unlock() }
switch event {
case .next:
if self.isDisposed || self.stopped {
return Observers()
}
// MARK: - self.observers 호출!!
return self.observers
case .completed, .error:
if self.stoppedEvent == nil {
self.stoppedEvent = event
self.stopped = true
let observers = self.observers
self.observers.removeAll()
return observers
}
return Observers()
}
}
해당 메소드에서 확인해보면 self.observers 을 리턴하는 것을 볼 수 있다. 아까 우리는 구독 과정에서 self.observers.insert() 메소드를 통해서 AnonymousObserver를 보관하고 있다는 것을 알고있다.
즉 이 메소드는 AnonymousObserver들을 반환하고 있다는 것을 알 수 있다.
이제 dispatch() 메소드를 확인해보자.
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
// MARK: - AnonymousObserver 실행
for element in dictionary.values {
element(event)
}
}
}
for element in dictionary.values {
element(event)
}
해당 구문을 통해서 AnonymousObserver들의 이벤트핸들러, 즉 클로저가 실행되는 것을 확인할 수 있다.
해당 과정을 통해서 어떻게 Subject가 Observable과 Observer역할을 둘 다 할 수 있는지에 대해서 확인해볼 수 있었다.
결국 Subject의 Observer역할은 AnonymousObserver들에게 이벤트를 방출하는 역할을 가지고 있다는 것을 확인할 수 있었고, Observable의 역할은 AnonymousObserver들을 관리하는 역할을 하고 있다는 것을 확인할 수 있었다.
글로 쓰다보니 어떤 흐름으로 이어지는지 적으려니 매우 쉽지 않다는 것을 느꼈다. 그만큼 내가 해당 로직에 대해 매우 정확하게 이해하고 있지는 않다는 것을 알게되었다.
머릿속으로도 다시 구독 및 방출에 대한 흐름을 정리해보고 코드를 좀 더 뜯어보는 시간을 가져야 할 것 같다.