Rxswift는 비동기 프로그래밍을 관찰 가능한 순차적 형태와 함수 형태의 연산자를 통해 처리하게끔 도와줍니다.
ReactiveX에서 observer는 observable 을 구독한다. observer는 observable이 방출하는 모든 아이템(들)에 대해 반응한다.

create : Observable 생성하기
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.Element>)
-> RxSwift.Disposable) -> RxSwift.Observable<Self.Element>
subscribe
observable.subscribe(onNext: { (element) in //
print(element)
})
답은 subscribe 에 있습니다.
Subscribe operate upon the emissions and notifications from an Observable
The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.
Subscribe는 Observable로부터 발생한 notification과 emission에 따라 동작한다.
Subscribe는 Observable에 Observer를 연결하는 접착제다. Observer가 Observable로 부터 방출된 아이템이나 에러, 완료 알림을 받기 위해서는 반드시 처음에 이 연산자를 Observable에 사용해 구독해야만 가능하다.

But, 보통 사용하는 subscribe
_ = downloadJson(MEMBER_LIST_URL)
.subscribe(onNext: {print($0)}, onError: {err in print(err)}, onCompleted: {print("com")})
⁉️ Observable.swift 파일에서는 `subscribe` 메서드 파라미터로 Observer를 전달했는데, 위에서 사용하는 메서드에서는 Observer가 어디있지?
ObservableType은 Observable 클래스가 따르는 프로토콜onNext, onError, onCompleted, onDisposed 를 파라미터로 받습니다. (클로저 형태)Disposable입니다. Disposable 을 반환하는 이유는 Observable 이 관찰가능한 형태이기 때문에 생성해서 구독하게 되면 구독을 해제할 수 있어야 합니다. (즉, 더이상 Observable 에 대해서 이벤트를 받고 싶지 않을 때 사용)public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
// ************* 중요 코드 1 *************
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
// ************* 중요 코드 2 *************
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}

subscribe(onNext:...) 메서드 내부에서 AnonymousObserver를 자체적으로 생성해서 구독합니다.
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
생성자
eventHandler 프로퍼티에 파라미터로 전달된 클로저( switch 문을 통해 event 처리 구현 내용 포함) 저장
onCore 메서드
eventHandler 실행
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self.eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
self.eventHandler(event)
}
}
self.asObservable().subscribe(observer) // 여기서 self는 Observable
ObservableType을 Observable(정확히 말하면 AnonymousObservable)로 변환해주는 메서드
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<Element> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
Observable.create { o in self.subscribe(o) }
}
}
observer를 받아서 그에 대한 subscription을 반환한다는 것
public class Observable<Element> : ObservableType {
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
self.asObservable().subscribe(observer)
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
observable.subscribe 할 때마다 만들어지는 observer에 대한 Disposable 타입의 subscription이다.
_ = downloadJson(MEMBER_LIST_URL)
.subscribe(onNext: {print($0)}, onError: {err in print(err)}, onCompleted: {print("com")})
우리가 사용하는 subscribe(onNext: …)를 사용하면 파라미터로 클로저만 넘겨줘도 내부적으로 Observer를 생성하여 Observable을 구독하여 방출하는 item을 받을 수 있는 것입니다.
어렵습니다…
ReactiveX - Subscribe operator
RxSwift subscribe 흐름 생성- 1
[RxSwift] subscribe의 동작원리 이해하기
[RxSwift] Observer
RxSwift) Observable, Observer, Subscribe 흐름 이해하기