지난 포스트에 이어 이번 포스트에서는 Rx 라이브러리 매커니즘의 기본이 되는 Observable, Observer, Subscribe에 대하여 알아보려고 한다.
Rx 라이브러리의 기본적인 동작 매커니즘은 옵저버 패턴에 기반하여 Observer가 Observable을 구독하는 데에서 시작한다. 그렇다면 Observer와 Obervable은 무엇일까?
Rx에서 Obervable은 관찰 가능한 대상을 의미한다.
여기서 의미하는 관찰 가능한 대상은 우리가 관심을 갖고 있는 대상으로 비동기 네트워크 통신과 같이 미래에 값이 준비되어 언제 도착할지 모르는 값을 관찰 가능한 대상이라고 할 수 있겠다.
Observable에 대한 정확한 정의는 다음과 같다.
Observable은 관찰 가능한 Stream으로 비동기 이벤트의 시퀀스를 생성할 수 있는 대상을 의미한다.
여기서 Stream이라고 한 것은 Observable은 이벤트의 발생에 따라 하나 또는 연속적인 항목(item)을 방출할 수 있기 때문이다.
Observer는 Observable이 방출하는 하나 또는 연속적인 항목에 반응할 수 있다. 이러한 매커니즘을 Observer가 Observable을 구독(Subscribe)한다라고 한다.
이러한 구독 패턴은 동시성 연산을 가능하게 하는데 Observer는 Observable가 이벤트를 발생하여 값을 방출할 때까지 기다릴 필요 없이 값을 방출하는 그 시점을 캐치하여 알림을 받으면 되기 때문이다.
Observer은 Observable이 값을 방출하는 것을 기다리지 않기 때문에 thread가 블락 되지 않는다.
이러한 병렬적 혹은 비동기적인 실행은 동시성 연산을 가능하게 했으며 또한 Observer는 Observable이 이벤트가 발생하면 그 순간을 감지함과 동시에 준비된 연산(Operation)을 실행하여 결과를 리턴할 수 있다.
Observable과 Observer가 뭔지는 대충 알겠는데 과연 어떻게 두 대상 간의 구독을 구현할 수 있을까?
Rx 라이브러리에는 subscribe
라는 메서드를 통해 Observer가 Observable을 구독하는 방법을 제공한다.
RxSwift에 정의 되어 있는 subscribe
의 원형은 다음과 같은데
public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable
총 4개의 파라미터로 함수 인자를 전달받는 것을 알 수 있는데 각 파라미터로 전달된 함수들이 언제 호출되는지 알아보자.
onNext
: Observable은 새로운 항목들을 배출할 때마다 이 파라미터로 전달된 메서드를 호출한다. 이 메서드는 Observable이 배출하는 항목을 함수의 파라미터로 전달 받는다.
onError
: Observable은 원하는 데이터가 생성되지 않았거나 다른 이유로 오류가 발생할 경우 오류를 알리기 위해 이 파라미터로 전달된 메서드를 호출한다. 이 메서드가 호출되면 onNext
나 onCompleted
는 더 이상 호출되지 않는다. onError
메서드는 오류 정보를 저장하고 있는 객체를 파라미터로 전달 받는다.
onCompleted
: 오류가 발생하지 않았다면 Observable은 마지막 onNext
를 호출한 후 이 파라미터로 전달된 메서드를 호출한다.
onDisposed
: Observable와 관련된 모든 리소스가 메모리에서 해제될 때 이 파라미터로 전달된 메서드가 호출된다.
onNext
파라미터로 전달된 메서드는 최소 0회 이상 호출될 수 있으며, onCompleted
혹은 onError
로 전달된 메서드 실행을 마지막으로 Observable의 Stream은 종료된다.
아래의 코드는 subscribe
메서드를 통해 Observabe을 구독하고 onNext
와 onComplete
를 구현한 예제이다.
import RxSwift
let disposeBag = DisposeBag()
Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9])
.subscribe(onNext: { number in
print(number) //Observable이 값을 방출할때 마다 실행
}, onCompleted: {
print("completed") //Observable이 값을 모두 방출한 뒤에 호출
})
.disposed(by: disposeBag)
//1
//2
//3
//4
//5
//6
//7
//8
//9
//completed
from
은 인자로 전달된 Array<Int>
타입의 내부 요소를 하나씩 방출하는 Observable을 반환하는 메서드이다.
그런데 위에서 설명한 것으로는 Observer가 Observable을 구독한다고 하지 않았는가? 위에서 본 subscribe
메서드의 반환 타입은 Disposable
타입이고 Observer라는 단어는 눈을 씻고 찾아도 찾아 볼 수 없다.
하지만 subscribe
메서드의 구현부를 살펴보면 아래와 같이 구현되어 있다.
public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
...
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
}
subscribe
구현 부분 내부에 AnonymousObserver
타입의 observer
는 변수를 생성하는 것을 볼 수 있었고 observer
는 클로저로 onNext
, onError
, onComplete
등의 함수를 호출하고 있는 것을 볼 수 있었다.
결론을 말하자면 Observable 객체에 subscribe
메서드만 호출 해주면 Observer는 알아서 생성된다라고 할 수 있다.
기본적으로 onComplete
혹은 onError
가 호출되면 자동으로 구독을 해지하고 메모리에서 사라지지만, 만약 Observable이 항목(item)을 방출하는데 오랜 시간이 걸리거나 무한정 방출하는 경우 Observable이 더 이상 항목(item)을 배출하는 것을 원하지 않을때 dispose()
메서드를 사용하여 수동으로 구독을 해지하고 메모리에서 해지하는 것이 가능하다.
import RxSwift
let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let disposable = observable.subscribe(onNext: { item in
print(item)
})
DispatchQueue.main.asyncAfter(deadline: .now() + 5, execute: {
disposable.dispose()
})
위의 코드에서 observable
인스턴스는 매초 0부터 1씩 증가한 값을 방출하는 Observable이다. 다시 말해 값을 무한정 방출하는 Observable이라고 할 수 있으며, 직접 구독 해지를 하지 않는다면 프로그램이 종료될 때까지 계속 메모리에 상주하게 될 것이다.
따라서 구독한 시점 5초 뒤에 dispose()
를 호출하여 수동으로 구독을 취소하고 메모리에서 해지 되도록 구현하였다.
지금은 Observable 인스턴스가 하나이지만 만약 프로젝트에서 여러 개의 Observable이 있다면 모든 Observable의 구독을 원하지 않는 시점에 dispose()
를 호출하는 것이 여간 귀찮은 일이 아닐 수 없다.
이러한 문제를 해결하기 위해 아래의 코드와 같이 DisposeBag
를 사용할 수 있다.
import RxSwift
struct SomeStruct {
let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
private var disposeBag = DisposeBag()
func subscribe() {
observable.subscribe(onNext: { item in
print(item)
})
.disposed(by: disposeBag)
}
}
위의 코드와 같이 Observable을 구독 후 disposed(by:)
메서드를 통해 DisposeBag
타입의 인스턴스를 등록해 두면 SomeStruct
인스턴스가 메모리에서 해제될 때 Observable 구독도 함께 취소하고 메모리에서 해지할 수 있다.
위에서 언급했다시피 Observable은 이벤트가 발생할때 항목(item)을 방출한다고 했는데 방출 시점은 Hot Observable이냐 Cold Observable이냐에 따라 그 시점이 다르게 동작할 수 있다.
Hot Observable이라고 부르는 Observabe은 생성되자마자 이벤트가 발생하면 바로 항목을 방출하기 때문에 만약 Observer가 중간부터 구독을 한다면 구독 이전에 방출된 값들은 전달 받지 못하는 경우가 발생할 수 있다.
반대로 Cold Observable이라고 부르는 Observable은 구독이라는 행위가 발생할 때까지 이벤트가 발생하여도 항목을 방출하지 않기 때문에 Observable이 방출하는 모든 항목을 전달 받을 수 있도록 보장한다.