Observable의 sequence는 구독을 할 때 시작된다. 그리고 매번 새로운 sequence를 생성해서 구독자에게 이벤트를 전달한다. 문제는 하나의 Observable에서 여러 구독이 일어날 때인데, 구독할 때마다 새로운 sequence가 생성되기 때문에 불필요하게 리소스 낭비가 일어난다.
예를 들어, URLSession의 data 연산자로 네트워트 작업을 하고 이 이벤트를 여러 구독자가 받는다고 생각해보자. 100명의 구독자들이 100번의 request를 보내면 어떻게 될까? 매번 새로운 요청을 하고 그만큼 네트워크 리소스 낭비가 일어나고 그에 따른 다른 리소스 낭비(구동 속도 저하, 배터리 낭비 등)도 일어난다. 그렇기 때문에 새로운 sequence를 생성할게 아니라면, 구독을 공유하는 것이 필수적이다.
RxSwift에서는 구독 공유를 지원하는 다양한 연산자가 있다. 하나하나 살펴보자!
구독 공유의 가장 기본이 되는 연산자이다. 사실 직접 사용되기 보다 다른 연산자의 모체로서 사용된다. multicast를 위한 subject를 생성해야 하고 또 connect() 연산자를 사용해 sequence를 시작해야 하기 때문에 상대적으로 복잡한 구조를 가지고 있다. 따라서 많이 사용되지는 않는다.
원본 Observable에서 이벤트를 방출하면 파라미터로 받은 subject로 들어가 모든 구독자에게 공유(unicast -> multicast) 된다. 리턴값이 ConnectableObservable인데, 이 Observable은 구독시 새로운 sequence를 시작하지 않고 connect()를 호출할 때 시작한다.
public func multicast<Subject: SubjectType>(_ subject: Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
}
아래와 같이 사용한다.
let bag = DisposeBag()
let subject = PublishSubject<Int>()
let source = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.multicast(subject)
source
.subscribe { print($0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print($0) }
.disposed(by: bag)
source.connect()
multicast 연산자를 호출하고 자체적인 PublishSubject를 넣어줘서 간단하게 구현한 것이다.
// 이 부분을 지우자. 자동으로 생성해주기 때문
// let subject = PublishSubject<Int>()
let source = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.publish()
// 하지만 이 부분은 필요하다
source.connect()
위의 예제에서 두 번째 구독자에게 이전의 이벤트도 함께 전달하려면 어떻게 해야 할까? PublishSubject는 별도의 buffer가 없기 때문에 불가능하다. But! ReplaySubject를 쓰면 간단하게 해결된다. 하지만 replay 연산자를 쓰면 더 간단하게 구현이 가능하다. Publish 연산자와 마찬가지로 replay 연산자는 내부에 replaySubject를 가지고 있다. 파라미터로 버퍼사이즈만 정해주면 된다.
replayAll() 연산자도 있다.
하지만 메모리 사용량이 급격히 증가할 수 있기 때문에 조심해야 한다.
// 이 부분을 지우자. 자동으로 생성해주기 때문
// let subject = ReplaySubject<Int>.create(buffersize: 5)
let source = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.replay(5)
// 이 부분은 여전히 필요하다
source.connect()
refCount은 ConnectableObservableType의 extension에 구현되어 있다. 즉 일반 Observable에 사용이 불가하다. 이 연산자는 RefCount라는 Observable을 리턴하는데, RefCount Observable은 내부에 ConnectableObservable을 유지하면서 새로운 구독자가 추가되는 시점에 자동으로 connect 메소드를 호출한다. 그리고 구독자가 구독을 중지하고 다른 구독자가 없다면 sequence를 중지한다. 그러다가 다시 구독하면 connect를 호출한다. 이 때는 새로운 sequence를 시작한다.
let bag = DisposeBag()
let source = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug()
.publish()
.refCount()
let observer1 = source
.subscribe { print("🔵", $0) }
// 내부에서 connect를 자동으로 호출하기 때문에 필요가 없다.
// source.connect()
// 이 시점에 다른 구독자가 없기 때문에 disposed
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
observer1.dispose()
}
// 다시 시작하기 때문에 0부터 출력
DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
let observer2 = source.subscribe { print("🔴", $0) }
// 이 때도 구독자가 취소되어 없기 때문에 dispose
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
observer2.dispose()
}
}
refCount 연산자를 사용하면, ConnectableObservable을 직접 관리할 필요가 없다. connect도 직접 호출하고 필요한 시점에 dispose도 자동으로 관리한다.
간단하게 공유하는 Observable을 만들 수 있는 연산자다. 처음 구독을 하는 경우에만 sequence가 생성되고 그 이후로는 구독을 공유해서 debug print가 출력되지 않는다.
URLSession.shared.rx.data(request : request)
.map { "\($0)" }
.debug()
.share()
간단해보이지만 사실 위의 모든 기술이 망라되어 있는 연산자다. 자세히 살펴보자.
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
-> Observable<Element> {
// scope 파라미터로 subject의 수명을 결정하는데, 두 가지 옵션이 있다.
// whileConnected는 새로운 connect가 시작되면 새로운 subject가 생성되고 disconnect되면 사라진다. 따라서 connection끼리의 공유가 안된다.
// 반대로 forever의 경우 모든 connection이 공유된다.
switch scope {
case .forever:
// replay 파라미터를 0으로 전달하면 PublishSubject, 0보다 크면 ReplaySubject를 사용한다.
// multicast 연산자를 사용해 구독을 공유하고
// 그리고 refCount 연산자로 connect와 disconnect를 관리한다.
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
share연산자의 파라미터에 따라 내부의 multicast, Replay, Publish, refCount가 상황에 맞게 일어난다. 0부터 1초간 차례로 하나씩 이벤트를 방출하는 Observable이 있다고 했을 때,
// 버퍼 저장없이 이어서 방출되다가 구독자가 없으면 dispose, 다시 생기면 새로운 sequence를 시작한다.
// 0 1 2 3 3, 0 1 2
.share(replay: 0, scope: .whileConnected)
// subject가 공유되고 있을시 2버퍼가 전달되지만 새로운 sequence가 생성되는 타이밍에는 버퍼가 전달되지 않는다.
// 0 1 2 1 2 3 3, 0 1 2
.share(replay: 2, scope: .whileConnected)
// 모든 구독자가 하나의 subject를 공유해서 버퍼 전달이 가능하지만, 이 역시 새로운 sequence를 시작하기 때문에 다시 0부터 방출한다.
// 0 1 2 1 2 3 3, 2 3 0 1 2
.share(replay: 2, scope: .forever)
이렇게 파라미터에 따라 달라지기 때문에 share 연산자도 유념해서 사용할 필요가 있다.