지난 시간에는 Observer에 대해서 알아보았다. Observable에 정의되고 비동기적으로 방출되는 이벤트를 Observer는 구독해서 처리를 한다.
SomeObservable
.subscribe { int in
print("Next", int)
} onError: { error in
print("Error", error)
} onCompleted: {
print("Completed")
} onDisposed: {
print("Disposable")
}
// ... Disposable까지 출력
SomeObservable
.subscribe {
print($0)
}
// ... onCompleted까지만 출력
근데 궁금한건 Next, Complete, Error 이벤트는 알겠는데 맨 뒤에 있는 onDisposed는 무엇일까? 사실 이 이벤트는 Observable이 전달하는 이벤트는 아니다. 위 코드에서 onCompleted까지만 전달하는 것을 보면 알 수 있다. 그럼 onDisposed()는 어떤 이벤트일까? onDisposed는 파라미터로 클로저를 전달하면, Observable과 관련된 모든 리소스가 제거된 후에 호출되는 이벤트다.
그렇다면 후자는 리소스가 해제되지 않은건가? Observable이 onComplete, onError 이벤트로 종료되었다면 관련 리소스는 자동으로 해제된다. 그렇다면 왜 두 번째는 출력이 되지 않은걸까? 언급했듯이 onDisposed는 Observable의 이벤트가 아니기 때문이다. 첫 번째 코드에서 Disposed가 출력된 것은 단지 저 이벤트를 전달받도록 설정해놓았기 때문이다. (리소스가 제거되는 시점에 무언가를 하고 싶다면, 위의 코드를 사용, 아니라면 두 번째 코드를 사용한다.) 아래 코드와 주석으로 자세히 알아보자.
extension ObservableType {
// subscribe는 Disposable을 반환한다. 이건 아래에서 자세하게 다뤄보자
public func subscribe(
onNext: ((E) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil) -> Disposable {
let disposable: Disposable
// 위 설명처럼 onDisposed에 파라미터가 전달되어 옵셔널 바인딩으로 처리가 되면
// 파라미터로 전달된 disposed를 실행하는 AnonymousDisposable(disposeAction:)을 생성한다.
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
// 그게 아니라면 그냥 빈 Disposable(NopDisposable)이 생성된다.
// 이것으로 이 이벤트를 처리하는 곳이 Observable이 아니라는 것을 알 수 있다.
else {
disposable = Disposables.create()
}
// 코드 중략 ...
// Observable이 처리하는 이벤트는 이곳이다.
// onDisposed 처리는 위에 있고 Observable안에 없다.
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
// error 이벤트가 전달되면 이곳에서 리소스 정리가 일어난다.
disposable.dispose()
case .completed:
onCompleted?()
// complete 이벤트가 전달되면 이곳에서 리소스 정리가 일어난다.
disposable.dispose()
}
}
// 이곳은 BinaryDisposable을 반환한다.
// 이 안에서는 파라미터로 전달된 Disposable을 dispose()하고 메모리에서 해제한다.
// 이곳에서는 observer를 구독해서 얻은 Disposable과
// 위의 onDisposed 이벤트로 인해 만들어진 disposable을 파라미터로 전달한다.
// Subscribe 메소드는 Disposable을 반환해서 리소스 정리와 실행 취소를 유도한다. Subscription Disposable이라고 부른다.
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
Disposables?
Disposables는 각종 Disposable을 반환하는 create메소드를 가진 구조체다. 이 create를 통해 NoDisposable, AnoymousDisposable, BinaryDisposable, CompositeDisposable 등을 구성할 수 있고 각 Disposable은 다른 역할을 수행한다.
public protocol Disposable {
func dispose()
}
public protocol Cancelable : Disposable {
var isDisposed: Bool { get }
}
프로토콜에 정의된 것처럼 Disposeable의 역할은 dispose()라는 기능을 실행하는 것이다. 주어진 상황에 따라 dispose()의 구체적인 기능은 다르지만 핵심은 리소스를 해제하고 이벤트 구독을 중지하는 것이다. 리소스 낭비와 불필요한 이벤트 수신으로 문제를 일으키지 않기 위해서다.
위에서 살펴본 것처럼 subscribe 메서드에서 Completed나 Error가 나는 경우, 자동으로 리소스가 해제된다. 이 때는 따로 리소스 정리를 안해도 되지만 문서에서는 하라고 나와있다. 그러니 해보자.
Subscribe 메소드를 보면 return이 Disposable이다. 이 메소드가 리턴하는 Disposable을 ‘Subscription Disposable’이라고도 한다. 이미 살펴본 것처럼 Subscription Disposable은 크게 리소스 해제와 실행 취소에 사용된다. 아래 스트림의 리소스를 관리해보자.
let some = SomeObservable
.subscribe { int in
print("Next", int)
} onError: { error in
print("Error", error)
} onCompleted: {
print("Completed")
} onDisposed: {
print("Disposable")
}
some.dispose()
이렇게 바로 dispose() 메소드로 리소스를 해제할 수 있지만, 더 나은 방법이 있다.
DisposeBag을 사용하는 방법인데, 공식문서에서도 dispose()를 직접 호출하는 것보다 DisposeBag을 사용하라고 되어있다. DisposeBag을 만들어서 여기에 Disposable을 담았다가 한 번에 해제하라는 뜻이다. DisposeBag을 만드는 방법은 간단하다.
var bag = DisposeBag()
Dispasable을 담을 때는 disposed(by:) 메소드를 사용하고, by에 파라미터로 Dispose Bag을 전달하면, subscribe가 리턴하는 Disposable이 DisposeBag에 추가된다. 이곳에 추가된 Disposable은 DisposeBag이 해지되는 시점에 함께 해지된다.
subscription1.disposed(by: bag)
그럼 DisposeBag은 언제 해제되나? 당연히 위의 변수가 해제되는 시점에 해제된다. 하지만 그 전에, 또는 내가 원하는 시점에 해제를 하고 싶다면 어떻게 할까? DisposeBag에 다시 dispose? 하지만 DisposeBag에는 dispose() 메소드를 제공하지 않는다. 다만, 새로운 DisposeBag을 만들면 이전에 있던 DisposeBag이 해제된다.
bag = DisposeBag()
아니면 변수를 옵셔널로 선언하고 nil을 할당할 수도 있다.
var newBag: DisposeBag? = DisposeBag()
newBag = nil
let some2 = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { elem in
print("Next", elem)
}, onError: { error in
print("Error", error)
}, onCompleted: {
print("Completed")
}, onDisposed: {
print("Disposed")
})
이 함수는 1씩 증가하는 정수를 1초에 간격으로 방출한다. 무한정 방출하기 때문에 방출을 중단시킬 방법이 필요하다. 여기에 사용되는 것이 dispose()이다.
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
some2.dispose()
}
이렇게하면 3초 뒤에 실행이 취소된다. dispose()를 호출하면 모든 리소스가 해제되기 때문에 더 이상 이벤트가 전달되지 않는다. 또 리소스가 해제되는 것이기 때문에 completed 이벤트도 전달되지 않는다. 이런 이유로 dispose 메소드를 호출하는 것은 가능한 피해야 한다. 만약 특정시점에서 멈추고 싶다면 take, until과 같은 연산자로 구현할 수 있다.
설명 감사합니다. 이해가 쉽네요.