RxSwift - Mathematical & Aggregate & Connectable Observable Operators

JSLee·2022년 3월 17일
0

concat

concat 은 RxSwift 에 CombineOperator 안에 속에 있는 Operator 입니다.

concat 은 두개의 Observable 에서 방출되는 Event 를 결합하여

하나의 Observable 로 만들어주는 기능을 합니다.

        let firstObservable = Observable<Int>.of(1,2,3)
        let secondObservable = Observable<Int>.of(4,5,6)
        
        firstObservable
            .concat(secondObservable)
            .subscribe(onNext: {
            print($0)
        }).disposed(by: disposBag)
        
//print
1
2
3
4
5
6

두개의 Observable 의 값을 결합하여 방출합니다.
이 밖에 다양한 Operator를 사용하여 Event 의 값을 변형 할수도 있습니다.

        let firstObservable = Observable<Int>.of(1,2,3)
        let secondObservable = Observable<Int>.of(4,5,6)
        
        firstObservable
            .concat(secondObservable)
            .filter { $0 % 2 == 0 }
            .toArray()
            .subscribe { singleEvent in
                print(singleEvent)
            }.disposed(by: disposBag)
            
            
            //print
            success([2, 4, 6])

하지만 위 예시는 가장 기본적인 동작을 테스트 하기 위한 예시입니다.
여기서 놓칠수 있는 부분이 있어 조금더 다양한 예시를 통해 살펴보겠습니다.

        let first = Observable<Int>.create { observer in
            observer.onNext(1)
            return Disposables.create()
        }
        
        let second = Observable<Int>.of(2,3,4)

        let concat = Observable.concat([first,second])
        concat
            .debug()
            .subscribe(onNext : {
            print($0)
        }).disposed(by: disposBag)

creat 와 of 생성생 Int 타입의 Observable 2개가 있습니다.
이 두개의 Observable 을 concat을 통해 새로운 Observable 로 결합합니다.
그럼 출력되는 값은 어떻게 될까요?

저는 1 2 3 4 를 생각했습니다.

1

이유는 위 이미지 와 debug 를 통해 알수 있습니다.

2022-03-17 11:28:11.132: SetupVC.swift:54 (testConcat()) -> subscribed
2022-03-17 11:28:11.135: SetupVC.swift:54 (testConcat()) -> Event next(1)

debug 출력문 에서는 Dispose 에 대한 출력이 없습니다.

그렇다는건 구독행위 자체가 dispose 되지 않았고 그 이유는 방출에서 알수 있습니다.

concat 연산자는 첫번째 Observable 의 생명주기가 종료된후 두번째 Observable 를 방출하여 결합합니다.

그로 인해 첫번째 Observable 의 주기가 종료되지 않아 결합을 할수 없는 것입니다.

이 문제는 onCompleted Event 방출로 해결할수 있습니다.

        let first = Observable<Int>.create { observer in
            observer.onNext(1)
            observer.onCompleted()
            return Disposables.create()
        }

onCompleted 방출

2022-03-17 11:30:01.756: SetupVC.swift:54 (testConcat()) -> subscribed
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(1)
1
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(2)
2
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(3)
3
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(4)
4
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event completed
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> isDisposed

정상작동

        let subject = PublishSubject<String>()
        
        subject.onNext("FirstSubject 이벤트 요소 === 하나")
        
        let secondSubject = BehaviorSubject<String>(value: "SecondSubject 이벤트 요소 === 하나")
        
        
        secondSubject.onNext("SecondeSubject === 둘")
        
        let concatObservable = Observable.concat([subject,secondSubject])
        
        concatObservable
            .debug()
            .subscribe(onNext: {
            print($0)
        }).disposed(by: disposBag)
        subject.onNext("FirstSubject 이벤트 요소 === 셋")
        secondSubject.onNext("SecondSubject 이벤트 요소 === 셋")

subject 또한 비슷한 맥락을 가지고 있습니다.

다른 특성에 Publish / Behavior 두 개의 subject 를 concat 하였을때

2022-03-17 11:35:23.577: SetupVC.swift:75 (testConcat()) -> subscribed
2022-03-17 11:35:23.579: SetupVC.swift:75 (testConcat()) -> Event next(FirstSubject 이벤트 요소 ===)
FirstSubject 이벤트 요소 ===

이와 같은 출력문을 전달해 줍니다.

여기서 PublishSubject 의 특징은 맞게 나타나지만
BehaviorSubject 의 특징은 잘 나타나지 않습니다. 구독 이전의 Event요소를 가져오지 못합니다.

조금더 알기 쉬운 예시 코드를 작성하자면

       let pSub = PublishSubject<String>()
        pSub.onNext("p")
        let bSub = BehaviorSubject<String>(value: "bA")
        pSub
            .concat(bSub)
            .toArray()
            .debug()
            .subscribe()
            .disposed(by: disposBag)
        pSub.onNext("pA")
        bSub.onNext("bB")
        pSub.onNext("pB")
        bSub.onNext("bC")
        pSub.onNext("pC")
        bSub.onNext("bD")
        pSub.onNext("pD")
        pSub.onCompleted()
        bSub.onNext("bF")
        bSub.onNext("bG")
        bSub.onNext("bH")
        bSub.onCompleted()

위와 같은 코드에서 concat이 작동되면
publishSubject 는

  • 구독 이전 Event 방출을 무시한다
    -> p 를 무시함

behaviorSubject 는

  • 구독 이전 최신값 + 구독 이후 값을 관찰한다.
    -> bA -> bH 까지
2022-03-17 12:17:46.180: SetupVC.swift:119 (testConcat()) -> subscribed
2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> Event next(["pA", "pB", "pC", "pD", "bD", "bF", "bG", "bH"])
2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> Event completed
2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> isDisposed

하지만 output 에서 보여지는 behaviorSubject는 publishSubject 의 방출이 끝나는 지점(onCompletd)의 전 의 방출된 자신의 Event 만 관찰할수 있습니다.

즉 구독의 기준이 아닌 모체로 실행되는 publishSubject의 생명주기 종료 시점에 기준하여 Event를 관찰할수 있습니다.

또한 두개의 subject 둘중 하나라도 onCompleted Event 가 실행되지 않는다면 observer 는 onNext Event를 관찰할수 없습니다.

reduce

Swift 고차함수 안에서도 속해있는 reduce와 동일합니다.

reduce 는 모든 Event에 대한 총합을 방출하게됩니다.

(scan 은 중간 과정을 모두 방출합니다.)

        let observer = Observable.range(start: 1, count: 5)
        observer.scan(0, accumulator: +)
           .subscribe { print("scan === \($0)") }
           .disposed(by: disposBag)

        observer.reduce(0, accumulator: +)
            .subscribe{ print("reduce === \($0)") }
            .disposed(by: disposBag)

클로져를 통한 다양한 연산도 가능합니다.

   Observable.from([1, 2, 3])
          .reduce(2) { $0 * $1 }
          .debug()
          .subscribe()
          .disposed(by: disposBag)

Connectable Observable Operators

connectable Operaotr 를 사용하게 되면 공유 할수 있는 (연결 가능한) Observable 로 변환 할수 있습니다.

Observable 은 unicast 로써 하나의 observer 만 연결될수 있습니다.

 func buttonLabelConfigure(){
        view.addSubview(bt)
        bt.snp.makeConstraints {
            $0.centerX.centerY.equalToSuperview()
            $0.width.equalTo(200)
            $0.height.equalTo(80)
        }
        view.addSubview(lb)
        lb.snp.makeConstraints {
            $0.top.equalTo(bt.snp.bottom).offset(20)
            $0.centerX.equalToSuperview()
            $0.width.equalTo(200)
            $0.height.equalTo(80)
            
        }
    }
    func testShare(){
    // API Requst 를 통해 Responde 받는 Observable 이라고 가정
        let requestAPI = Observable.just(100).debug("API - Request")
        
        let tapResult = bt.rx.tap
            .flatMap { requestAPI }
        
        tapResult
            .map { $0 > 3 }
            // bind(to:)는subscribe()의 별칭(Alias)으로 Subscribe()를 호출한 것과 동일
            .bind(to: bt.rx.isHidden )
            .disposed(by: disposBag)
        
        tapResult
            .map { "Count : \($0)" }
            .bind(to: lb.rx.text )
            .disposed(by: disposBag)
    }

버튼 tap 대한 Observable(시퀀스) 에 2번에 bind ( 구독 ) 가 있다고 가정하겠습니다.

이럴경우 API 콜에 대한 Observable 은 bind(2번) 를 통해 2개의 시퀀스가 생성 방출됩니다.

2022-03-17 13:42:20.669: API - Request -> subscribed
2022-03-17 13:42:20.671: API - Request -> Event next(100)
2022-03-17 13:42:20.671: API - Request -> Event completed
2022-03-17 13:42:20.671: API - Request -> isDisposed
2022-03-17 13:42:20.672: API - Request -> subscribed
2022-03-17 13:42:20.672: API - Request -> Event next(100)
2022-03-17 13:42:20.673: API - Request -> Event completed
2022-03-17 13:42:20.673: API - Request -> isDisposed

출력문에서 볼수 있듯 응답으로 들어오는 onNext 이벤트가 2번이 들어왔습니다.

이러한 문제점을 해결할수 있는 Connect Observable Operator 로는

share() operator 가 있습니다.

share

연산자를 사용하면 Subscribe()할때마다 새로운 Observable 시퀀스가 생성되지 않고, 하나의 시퀀스에서 방출되는 Event 요소를 공유할수 있습니다

unicast -> multicast ??

    let tapResult = bt.rx.tap
            .flatMap { requestAPI }
            .share()
2022-03-17 13:59:34.499: API - Request -> subscribed
2022-03-17 13:59:34.501: API - Request -> Event next(100)
2022-03-17 13:59:34.502: API - Request -> Event completed
2022-03-17 13:59:34.502: API - Request -> isDisposed

하지만 그렇다고 Subject 의 기능을 완벽히 수행한다고 볼수는 없습니다.

이것을 확인할수 있는 코드를 먼저 보시면

        let observable = Observable<Int>.create { observer in
            observer.onNext(Int.random(in: 0..<10))
            return Disposables.create()
        }
        
        let observer1 = observable
            .subscribe { print("observer 1 === \($0)") }
        observer1.disposed(by: disposBag)
        
        let observer2 = observable
            .subscribe {print("observer 2 === \($0)") }
        observer2.disposed(by: disposBag)
        //print
        observer 1 === next(5)
        observer 2 === next(2)

share 를 사용하지 않는 경우의 Observable 입니다.
Observable 은 앞써 설명드린대로 구독이 시작되면 생성되어 방출되며
구독의 지점에 따라 각기 다른 시퀀스가 생성됩니다.

하지만 share 를 사용하게 될경우

 let observable = Observable<Int>.create { observer in
            observer.onNext(Int.random(in: 0..<10))
            return Disposables.create()
        }.share()
observer 1 === next(6)

이러한 출력문 나오게 됩니다.

여기서 Subject 와의 차이점을 찾을수 있습니다.
Subject 는 observer 간에 Event를 공유 합니다.

하지만 share 를 사용하는 Observable 같은 경우에는

첫번째 Subscribe 가 이루어지는 시점에만 (ex_구독 카운트가 0 에서 1로 변했을때 ) subscribe 를 생성합니다. 이후 생성되는 구독 행위에 대해선 무시되고 첫 호출한 subscribe 를 공유하게 됩니다.

replay(default : 0 ) , scope(default : .whileConnected )

share 에는 파라미터를 넣을수 있습니다.share(replay: , scope :)
여기서 replay 는 새롭게 subscribe 되는 observer 에게 방출되는 Event 요소에 대해 설정할수 있습니다.

        let observable = Observable<Int>.create { observer in
            observer.onNext(Int.random(in: 0..<10))
            observer.onNext(Int.random(in: 11..<20))
            observer.onNext(Int.random(in: 21..<30))
            observer.onNext(Int.random(in: 31..<40))
            return Disposables.create()
        }.share(replay: 2 , scope:  .whileConnected )
        
        let observer1 = observable
            .subscribe { print("observer 1 === \($0)") }
        observer1.disposed(by: disposBag)
        
        let observer2 = observable
            .subscribe {print("observer 2 === \($0)") }
        observer2.disposed(by: disposBag)
        
      let observer3 = observable
            .subscribe { print("observer 3 === \($0)") }
        observer3.disposed(by: disposBag)

       let observer4 = observable
           .subscribe { print("observer 4 === \($0)")}
            observer4.disposed(by: disposBag)
observer 1 === next(9)
observer 1 === next(14)
observer 1 === next(22)
observer 1 === next(34)
observer 2 === next(22)
observer 2 === next(34)
observer 3 === next(22)
observer 3 === next(34)
observer 4 === next(22)
observer 4 === next(34)

replay 에서 설정되는 값은 buffer size 라고 생각할수 있습니다.

observer 1 에서 subscribe 0 -> 1 로 변하고
observer 2 부터는 새롭게 subscribe 된 observer 입니다.
그렇기에 observer2 부터는 share 를 통한 observer1 subscribe 공유 이기 때문에
replay 에 설정된 값에 맞는 방출을 하게 됩니다.

또한 Observable 에서 completed 이벤트가 방출되면
모든 share 는 해제 됩니다.

 let observable = Observable<Int>.create { observer in
            observer.onNext(Int.random(in: 0..<10))
            observer.onNext(Int.random(in: 11..<20))
            observer.onNext(Int.random(in: 21..<30))
            observer.onNext(Int.random(in: 31..<40))
            observer.onCompleted()
            return Disposables.create()
        }.share(replay: 2 , scope:  .whileConnected )
2022-03-17 15:11:09.723: SetupVC.swift:95 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.725: SetupVC.swift:95 (testShareTwo()) -> Event next(3)
observer 1 === next(3)
2022-03-17 15:11:09.725: SetupVC.swift:95 (testShareTwo()) -> Event next(13)
observer 1 === next(13)
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event next(21)
observer 1 === next(21)
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event next(37)
observer 1 === next(37)
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event completed
observer 1 === completed
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> isDisposed
2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> Event next(8)
observer 2 === next(8)
2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> Event next(18)
observer 2 === next(18)
2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event next(28)
observer 2 === next(28)
2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event next(38)
observer 2 === next(38)
2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event completed
observer 2 === completed
2022-03-17 15:11:09.728: SetupVC.swift:100 (testShareTwo()) -> isDisposed
2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> Event next(6)
observer 3 === next(6)
2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> Event next(15)
observer 3 === next(15)
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event next(22)
observer 3 === next(22)
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event next(35)
observer 3 === next(35)
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event completed
observer 3 === completed
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> isDisposed
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(0)
observer 4 === next(0)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(13)
observer 4 === next(13)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(21)
observer 4 === next(21)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(33)
observer 4 === next(33)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event completed
observer 4 === completed
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> isDisposed

scope 에는 2개의 설정값이 있습니다

  • .whileConnected(observer 가 한 개 이상 있으면 replay가 유지되지만, dispose되어 0개가 되면 replay 버퍼 초기화)

  • .forever (observer 가 없어도 버퍼 유지)

whileConnected는 dispose 와 disposeBag 을 통해서 첫번째 observer 를 선택할수 있고
새로운 observer 들에게도 영향을 줄수 있습니다.

        let observer1 = observable
         
            .subscribe { print("observer 1 === \($0)") }
        observer1.dispose()//첫번째 observer 지만 dispose 됨
        
        let observer2 = observable
            .subscribe {print("observer 2 === \($0)") }
        observer2.disposed(by: disposBag) // disposeBage 
        
        let observer3 = observable
           
            .subscribe { print("observer 3 === \($0)") }
        observer3.dispose()

        let observer4 = observable
           
            .subscribe { print("observer 4 === \($0)")}
            observer4.dispose()
observer 1 === next(5)
observer 1 === next(11)
observer 1 === next(23)
observer 1 === next(39)

// observer 2 에 영향을 받음
observer 2 === next(2)
observer 2 === next(18)
observer 2 === next(28)
observer 2 === next(38)

observer 3 === next(28)
observer 3 === next(38)

observer 4 === next(28)
observer 4 === next(38)

하지만 여기서 forever 를 사용하게 되면

share(replay: 2 , scope:  .forever )
observer 1 === next(5)
observer 1 === next(19)
observer 1 === next(22) // a
observer 1 === next(31) // b

observer 2 === next(22) // a
observer 2 === next(31) // b
observer 2 === next(2)
observer 2 === next(14)
observer 2 === next(22)
observer 2 === next(38)

observer 3 === next(22)
observer 3 === next(38)

observer 4 === next(22)
observer 4 === next(38)

observer1 에 3~4 번째 event 를 observer 2 의 1~2 번째의 추가 되어
observer2 는 Observable 의 4번에 방출이 아닌 6번의 방출 Event 를 관찰하고 있습니다.

이유는 forever 스트림의 내부캐시가 지워지지 않아서 입니다.
그래서 observer1 에 3~4번째 Event 를 받아오는 것입니다(bufferSize : 2)

  • publish() : 이 연산자는 보통의 Observable을 ConnectableObservable로 변환해 줍니다.
        let observable = Observable<Int>.create { observer  in
            observer.onNext(Int.random(in: 10..<30))
            return Disposables.create()
        }.publish()
        
        let observer = observable
            .debug()
            .subscribe(onNext : { print($0)})
        observer.disposed(by: disposBag)
        
        let observer2 = observable
            .debug()
            .subscribe(onNext : { print($0)})
        observer2.disposed(by: disposBag)
        
        observable.connect().disposed(by: disposBag)
        
        //print
2022-03-17 15:57:43.384: SetupVC.swift:54 (testPublish()) -> subscribed
2022-03-17 15:57:43.385: SetupVC.swift:59 (testPublish()) -> subscribed
2022-03-17 15:57:43.387: SetupVC.swift:54 (testPublish()) -> Event next(21)
21
2022-03-17 15:57:43.387: SetupVC.swift:59 (testPublish()) -> Event next(21)
21
  • ConnectableObservable : ConnectableObservable은 Subscriber가 있어도 connect()를 호출하기 전까지는 아이템을 방출하지 않습니다. connect()를 호출하고 나서야 아이템을 방출하기 시작합니다.
  • refcount() : refcount() 는 ConnectableObservable에 Connect와 Disconnect를 자동으로 담당하고, ConnectableObservable을 보통의 Observable처럼 사용할 수 있게 해줍니다. 다시말해 Subscription count를 계속 세고 있다가 Subscription의 개수가 0 -> 1 개가 되는 시점에 connect()를 수행하고 Subscription이 0이 되면 disconnect()를 수행합니다.
profile
iOS/Android/FE/BE

0개의 댓글