[RxSwift] Observable, Observer, Subscribe 이해하기

이정훈·2024년 3월 10일
0

ReactiveX

목록 보기
2/5
post-thumbnail

지난 포스트에 이어 이번 포스트에서는 Rx 라이브러리 매커니즘의 기본이 되는 Observable, Observer, Subscribe에 대하여 알아보려고 한다.

Rx 라이브러리의 기본적인 동작 매커니즘은 옵저버 패턴에 기반하여 ObserverObservable을 구독하는 데에서 시작한다. 그렇다면 ObserverObervable은 무엇일까?

Observable

Rx에서 Obervable관찰 가능한 대상을 의미한다.

여기서 의미하는 관찰 가능한 대상은 우리가 관심을 갖고 있는 대상으로 비동기 네트워크 통신과 같이 미래에 값이 준비되어 언제 도착할지 모르는 값을 관찰 가능한 대상이라고 할 수 있겠다.

Observable에 대한 정확한 정의는 다음과 같다.

Observable은 관찰 가능한 Stream으로 비동기 이벤트의 시퀀스를 생성할 수 있는 대상을 의미한다.

여기서 Stream이라고 한 것은 Observable이벤트의 발생에 따라 하나 또는 연속적인 항목(item)을 방출할 수 있기 때문이다.

Observer

ObserverObservable이 방출하는 하나 또는 연속적인 항목에 반응할 수 있다. 이러한 매커니즘을 ObserverObservable구독(Subscribe)한다라고 한다.

이러한 구독 패턴은 동시성 연산을 가능하게 하는데 ObserverObservable가 이벤트를 발생하여 값을 방출할 때까지 기다릴 필요 없이 값을 방출하는 그 시점을 캐치하여 알림을 받으면 되기 때문이다.

ObserverObservable이 값을 방출하는 것을 기다리지 않기 때문에 thread가 블락 되지 않는다.

이러한 병렬적 혹은 비동기적인 실행은 동시성 연산을 가능하게 했으며 또한 ObserverObservable이 이벤트가 발생하면 그 순간을 감지함과 동시에 준비된 연산(Operation)을 실행하여 결과를 리턴할 수 있다.

Subscribe

ObservableObserver가 뭔지는 대충 알겠는데 과연 어떻게 두 대상 간의 구독을 구현할 수 있을까?

Rx 라이브러리에는 subscribe라는 메서드를 통해 ObserverObservable을 구독하는 방법을 제공한다.

RxSwift에 정의 되어 있는 subscribe의 원형은 다음과 같은데

public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable

총 4개의 파라미터함수 인자를 전달받는 것을 알 수 있는데 각 파라미터로 전달된 함수들이 언제 호출되는지 알아보자.

  • onNext: Observable은 새로운 항목들을 배출할 때마다 이 파라미터로 전달된 메서드를 호출한다. 이 메서드는 Observable이 배출하는 항목을 함수의 파라미터로 전달 받는다.

  • onError: Observable은 원하는 데이터가 생성되지 않았거나 다른 이유로 오류가 발생할 경우 오류를 알리기 위해 이 파라미터로 전달된 메서드를 호출한다. 이 메서드가 호출되면 onNextonCompleted는 더 이상 호출되지 않는다. onError 메서드는 오류 정보를 저장하고 있는 객체를 파라미터로 전달 받는다.

  • onCompleted: 오류가 발생하지 않았다면 Observable은 마지막 onNext를 호출한 후 이 파라미터로 전달된 메서드를 호출한다.

  • onDisposed: Observable와 관련된 모든 리소스가 메모리에서 해제될 때 이 파라미터로 전달된 메서드가 호출된다.

onNext 파라미터로 전달된 메서드는 최소 0회 이상 호출될 수 있으며, onCompleted 혹은 onError로 전달된 메서드 실행을 마지막으로 ObservableStream은 종료된다.

아래의 코드는 subscribe 메서드를 통해 Observabe을 구독하고 onNextonComplete를 구현한 예제이다.

import RxSwift

let disposeBag = DisposeBag()

Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9])
    .subscribe(onNext: { number in
        print(number)    //Observable이 값을 방출할때 마다 실행
    }, onCompleted: {
        print("completed")    //Observable이 값을 모두 방출한 뒤에 호출
    })
    .disposed(by: disposeBag)

//1
//2
//3
//4
//5
//6
//7
//8
//9
//completed

from은 인자로 전달된 Array<Int> 타입의 내부 요소를 하나씩 방출하는 Observable을 반환하는 메서드이다.

그런데 위에서 설명한 것으로는 ObserverObservable을 구독한다고 하지 않았는가? 위에서 본 subscribe 메서드의 반환 타입은 Disposable 타입이고 Observer라는 단어는 눈을 씻고 찾아도 찾아 볼 수 없다.

하지만 subscribe 메서드의 구현부를 살펴보면 아래와 같이 구현되어 있다.

public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            ...
            
            let observer = AnonymousObserver<Element> { event in
            switch event {
            case .next(let value):
                onNext?(value)
            case .error(let error):
                if let onError = onError {
                    onError(error)
                }
                else {
                    Hooks.defaultErrorHandler(callStack, error)
                }
                disposable.dispose()
            case .completed:
                onCompleted?()
                disposable.dispose()
            }
        }

 }

subscribe 구현 부분 내부에 AnonymousObserver 타입의 observer는 변수를 생성하는 것을 볼 수 있었고 observer는 클로저로 onNext, onError, onComplete 등의 함수를 호출하고 있는 것을 볼 수 있었다.

결론을 말하자면 Observable 객체에 subscribe 메서드만 호출 해주면 Observer는 알아서 생성된다라고 할 수 있다.

구독 해지 (dispose)

기본적으로 onComplete 혹은 onError가 호출되면 자동으로 구독을 해지하고 메모리에서 사라지지만, 만약 Observable항목(item)을 방출하는데 오랜 시간이 걸리거나 무한정 방출하는 경우 Observable이 더 이상 항목(item)을 배출하는 것을 원하지 않을때 dispose() 메서드를 사용하여 수동으로 구독을 해지하고 메모리에서 해지하는 것이 가능하다.

import RxSwift

let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let disposable = observable.subscribe(onNext: { item in
    print(item)
})

DispatchQueue.main.asyncAfter(deadline: .now() + 5, execute: {
    disposable.dispose()
})

위의 코드에서 observable 인스턴스는 매초 0부터 1씩 증가한 값을 방출하는 Observable이다. 다시 말해 값을 무한정 방출하는 Observable이라고 할 수 있으며, 직접 구독 해지를 하지 않는다면 프로그램이 종료될 때까지 계속 메모리에 상주하게 될 것이다.

따라서 구독한 시점 5초 뒤에 dispose()를 호출하여 수동으로 구독을 취소하고 메모리에서 해지 되도록 구현하였다.

지금은 Observable 인스턴스가 하나이지만 만약 프로젝트에서 여러 개의 Observable이 있다면 모든 Observable의 구독을 원하지 않는 시점에 dispose()를 호출하는 것이 여간 귀찮은 일이 아닐 수 없다.

이러한 문제를 해결하기 위해 아래의 코드와 같이 DisposeBag를 사용할 수 있다.

import RxSwift

struct SomeStruct {
    let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    private var disposeBag = DisposeBag()
    
    func subscribe() {
        observable.subscribe(onNext: { item in
            print(item)
        })
        .disposed(by: disposeBag)
    }
    
}

위의 코드와 같이 Observable을 구독 후 disposed(by:) 메서드를 통해 DisposeBag 타입의 인스턴스를 등록해 두면 SomeStruct 인스턴스가 메모리에서 해제될 때 Observable 구독도 함께 취소하고 메모리에서 해지할 수 있다.

Hot Observable과 Cold Observable

위에서 언급했다시피 Observable은 이벤트가 발생할때 항목(item)을 방출한다고 했는데 방출 시점은 Hot Observable이냐 Cold Observable이냐에 따라 그 시점이 다르게 동작할 수 있다.

Hot Observable이라고 부르는 Observabe은 생성되자마자 이벤트가 발생하면 바로 항목을 방출하기 때문에 만약 Observer가 중간부터 구독을 한다면 구독 이전에 방출된 값들은 전달 받지 못하는 경우가 발생할 수 있다.

반대로 Cold Observable이라고 부르는 Observable구독이라는 행위가 발생할 때까지 이벤트가 발생하여도 항목을 방출하지 않기 때문에 Observable이 방출하는 모든 항목을 전달 받을 수 있도록 보장한다.

Reference

https://reactivex.io/documentation/ko/observable.html

profile
새롭게 알게된 것을 기록하는 공간

0개의 댓글