RxSwift - Observable Utility Operators

JSLee·2022년 3월 15일
0

Observable 작업을 위한 유용한 연산자 도구 상자

Delay

지정한 시간 이후에 Observable 은 Event 를 Emit 합니다.

Observable 은 delay 가 걸려있는 시간만큼 Event Emit 을 일시중지 합니다.

_ = Observable<Int>.just(1)
            .delay(.milliseconds(5000), scheduler: MainScheduler.instance)
            .debug()
            .subscribe(onNext: {
                print("just Emit === \($0)")
            }).disposed(by: disposeBag)
            
            

2022-03-15 09:48:35.187: ViewController.swift:50 (testDelay()) -> subscribed
2022-03-15 09:48:40.191: ViewController.swift:50 (testDelay()) -> Event next(1)
just Emit === 1
2022-03-15 09:48:40.191: ViewController.swift:50 (testDelay()) -> Event completed
2022-03-15 09:48:40.191: ViewController.swift:50 (testDelay()) -> isDisposed

출력문을 보면 subscribe가 완료되고 방출을 시작하기 전에 delay 로 인해서 5초 뒤에 Event 가 방출되고 completed -> dispose 순으로 생명주기가 종료됩니다.

만약 interval을 사용하여 Observable 을 생성하고 Event 를 방출하는데 interval 의 dueTime 보다 delay 의 dueTime 이 더 길다면 ??

        _  = Observable<Int>.interval(.milliseconds(3000), scheduler: MainScheduler.instance)
            .debug()
            .delay(.milliseconds(5000), scheduler: MainScheduler.asyncInstance)
            .take(6)
            .subscribe(onNext: {
                print($0)
            }).disposed(by: disposeBag)

interval 과 delay 에 dueTime 에 차이가 있습니다

2022-03-15 13:08:18.707: ViewController.swift:107 (testDelay()) -> subscribed
2022-03-15 13:08:21.710: ViewController.swift:107 (testDelay()) -> Event next(0)
2022-03-15 13:08:24.709: ViewController.swift:107 (testDelay()) -> Event next(1)
0
2022-03-15 13:08:27.708: ViewController.swift:107 (testDelay()) -> Event next(2)
1
2022-03-15 13:08:30.709: ViewController.swift:107 (testDelay()) -> Event next(3)
2
2022-03-15 13:08:33.709: ViewController.swift:107 (testDelay()) -> Event next(4)
3
2022-03-15 13:08:36.709: ViewController.swift:107 (testDelay()) -> Event next(5)
4
2022-03-15 13:08:39.709: ViewController.swift:107 (testDelay()) -> Event next(6)
5
2022-03-15 13:08:41.709: ViewController.swift:107 (testDelay()) -> isDisposed

delaySubscription

구독 시점을 지연 시키는 operator 입니다.

        _ = Observable<Int>.interval(.milliseconds(3000), scheduler: MainScheduler.instance)
            .debug()
            .take(4)
            .delaySubscription(.milliseconds(9000), scheduler: MainScheduler.instance)
            .subscribe(onNext: {
                print($0)
            }).disposed(by: disposeBag)

interval 은 방출하는 시간을 지정합니다.
take 를 통해 Emit Event 를 제한 하고.
delay 를 사용했을 경우

        let bSub = BehaviorSubject<String>(value: "A")
        bSub.onNext("B")
        bSub.delaySubscription(.seconds(1), scheduler: MainScheduler.asyncInstance)
            .subscribe(onNext: {
                print($0)
            }).disposed(by: disposeBag)
        bSub.onNext("C")
        bSub.onNext("D")
        bSub.onNext("E")
        bSub.onNext("F")
        bSub.onNext("G")

2022-03-15 13:12:48.225: ViewController.swift:90 (testDelaySubscription()) -> subscribed
2022-03-15 13:12:51.229: ViewController.swift:90 (testDelaySubscription()) -> Event next(0)
0
2022-03-15 13:12:54.228: ViewController.swift:90 (testDelaySubscription()) -> Event next(1)
1
2022-03-15 13:12:57.227: ViewController.swift:90 (testDelaySubscription()) -> Event next(2)
2
2022-03-15 13:13:00.228: ViewController.swift:90 (testDelaySubscription()) -> Event next(3)
3
2022-03-15 13:13:00.228: ViewController.swift:90 (testDelaySubscription()) -> isDisposed

BehaviorSubject 는 구독 전 Event 까지 Emit 합니다.

하지만 delaySubscription 으로 구독 시간을 지연 시켜보면

가장 마지막으로 onNext 된 G 만 Emit 합니다.

        let sub = PublishSubject<String>()
        sub.delaySubscription(.milliseconds(1), scheduler: MainScheduler.instance)
            .do(onSubscribe: { print("Subscribe")})
            .subscribe(onNext:{
                print($0)
            }).disposed(by: disposeBag)
                sub.onNext("1")
                sub.onNext("2")
                sub.onNext("3")
        // Subscribe
        // 이벤트 방출 X

구독 이후 이벤트를 방출하는 PublishSubject 는 do로 체크해본 구독은 하지만 방출은 진행되지 않습니다.

Do

Observable 생명 주기중 이벤트에 대해 수행할 작업을 설정할수있습니다.

.do(onNext: , afterNext: , onError: , afterError: , onCompleted: , afterCompleted: , onSubscribe: , onSubscribed: , onDispose: )

이와 같은 핸들링 작업을 가능하게 해줍니다.

      _ = Observable.of("🐶","🐱","🐭","🐹","🐼")
            .do(onNext: { print("doOnNext : \($0)")},
                onError: { print("doOnError : \($0)")})
                .subscribe(onNext: {
                    print("subscribeOn : \($0)")
                }).disposed(by: disposeBag)
                
doOnNext : 🐶
subscribeOn : 🐶
doOnNext : 🐱
subscribeOn : 🐱
doOnNext : 🐭
subscribeOn : 🐭
doOnNext : 🐹
subscribeOn : 🐹
doOnNext : 🐼
subscribeOn : 🐼

또한 do의 onNext 는 subscribe onNext 보다 먼저 실행됩니다.

        _ = Observable<Any>.never()
                    .do(onSubscribe : { print("subscribe")})
                        .subscribe(onNext: { print("\($0)")},
                                   onCompleted: { print("Complted")},
                                   onDisposed: { print("Disposed")})
                        .disposed(by: disposeBag)
                        
                        //subscribe

do 는 never 와 같이 어떠한 Emit 도 없는 Operator 에 subscribe 여부를 판단할때도 사용하기 좋습니다.

                        _ = Observable.of("🐶","🐱","🐭","🐹","🐼")
                        .do(onNext: { print("do onNext : \($0)") }, afterNext: { print("afterNext : \($0)\("🤣") " ) }, onCompleted: { print("Complted")}, afterCompleted: { print("AfterCompleted")}, onSubscribe: { print("onSubscribe")}, onSubscribed: { print("onSubscribed")}, onDispose: { print("Dispose")})
                            .subscribe(onNext: {
                                print("subscribe onNext : \($0)")
                            }).disposed(by: disposeBag)
                            
onSubscribe
onSubscribed
do onNext : 🐶
subscribe onNext : 🐶
afterNext : 🐶🤣 
do onNext : 🐱
subscribe onNext : 🐱
afterNext : 🐱🤣 
do onNext : 🐭
subscribe onNext : 🐭
afterNext : 🐭🤣 
do onNext : 🐹
subscribe onNext : 🐹
afterNext : 🐹🤣 
do onNext : 🐼
subscribe onNext : 🐼
afterNext : 🐼🤣 
Complted
AfterCompleted
Dispose

또한 이처럼 do는 여러 작업들을 통해 Data를 핸들링 할수 있습니다.

do 를 사용한 예시 코드

                _ = Observable.just("800x600")
                .observe(on: ConcurrentDispatchQueueScheduler(qos: .default))
                .map { $0.replacingOccurrences(of: "x", with: "/")}
                .map { "https://picsum.photos/\($0)?random" }
                .map { URL(string: $0)}
                .filter { $0 != nil }
                .map { $0! }
                .map { try Data(contentsOf: $0 ) }
                .map { UIImage(data:  $0 ) }
                .observe(on: MainScheduler.instance)
                .do(onNext: { print("do Image Size : \($0?.size) ")})
                    .subscribe(onNext: {
                        print("subscribe Image Size : \($0?.size) ")
                    }).disposed(by: disposeBag)

URL 을 통해 Image Load 를 하는 함수 이고 do 는 Image Size 를 체크할수 있습니다.

2022-03-15 13:30:46.899306+0900 RxTest[4437:146216] [boringssl] boringssl_metrics_log_metric_block_invoke(153) Failed to log metrics
2022-03-15 13:30:47.360511+0900 RxTest[4437:146214] [boringssl] boringssl_metrics_log_metric_block_invoke(153) Failed to log metrics
do Image Size : Optional((800.0, 600.0)) 
subscribe Image Size : Optional((800.0, 600.0)) 

Materialize / Dematerialize

materialize

사진에서 볼수 있듯 Materialize 는 Observable 을 분해하는 성질을 가지고 있습니다.

Dematerialize

Dematerialize 는 Emit 되는 Event 를 다시 Observable 로 만듭니다.

이둘은 몇가지의 특징이 있습니다.

  • 두개의 Operator 를 보통 같이 사용한다.
  • Observable을 분해할 수 있기 때문에 신중하게 사용해야 한다.
        let mater = Observable<Int>.create { obs in
            obs.onNext(1)
            obs.onNext(2)
            obs.onError(NSError(domain: "", code: 100))
            return Disposables.create()
            }
            .debug()
            .subscribe(onNext: {
                print($0)
            })
            .disposed(by: disposeBag)
            //print
2022-03-15 14:27:38.128: ViewController.swift:52 (testMeterialize()) -> subscribed
2022-03-15 14:27:38.129: ViewController.swift:52 (testMeterialize()) -> Event next(1)
1
2022-03-15 14:27:38.129: ViewController.swift:52 (testMeterialize()) -> Event next(2)
2
2022-03-15 14:27:38.130: ViewController.swift:52 (testMeterialize()) -> Event error(Error Domain= Code=100 "(null)")
Unhandled error happened: Error Domain= Code=100 "(null)"
2022-03-15 14:27:38.130: ViewController.swift:52 (testMeterialize()) -> isDisposed

debug 의 출력문을 자세히 보게 되면 subscribe 이후 onNext 되기 전까지의 Emit 요소는
Event next(1) 입니다. 이후 구독이 이루어지고 onNext 가 되며 Int 로 변경됩니다.

이럴경우 Emit 되는 Event 에 대한 Handling 에 어려움이 많습니다.
그래서 Observable 에서 Emit 되는 시퀀스를 Event 타입을 만들어야 합니다.

materialize 를 사용하게 되면 결과 값이 달라집니다.


        let mater = Observable<Int>.create { obs in
            obs.onNext(1)
            obs.onNext(2)
            obs.onError(NSError(domain: "", code: 100))
            return Disposables.create()
            }
            mater.materialize()
            .debug()
            .subscribe(onNext: {
                print($0)
            })
            .disposed(by: disposeBag)
//print     
2022-03-15 14:30:30.299: ViewController.swift:52 (testMeterialize()) -> subscribed
2022-03-15 14:30:30.301: ViewController.swift:52 (testMeterialize()) -> Event next(next(1))
next(1)
2022-03-15 14:30:30.301: ViewController.swift:52 (testMeterialize()) -> Event next(next(2))
next(2)
2022-03-15 14:30:30.302: ViewController.swift:52 (testMeterialize()) -> Event next(error(Error Domain= Code=100 "(null)"))
error(Error Domain= Code=100 "(null)")
2022-03-15 14:30:30.302: ViewController.swift:52 (testMeterialize()) -> Event completed
2022-03-15 14:30:30.302: ViewController.swift:52 (testMeterialize()) -> isDisposed

이런식으로 Int 형 값이 Event 타입의 next 에 맵핑되어 출력 됩니다.

이제 switch 문을 통한 .next , .error , . completed 에 대응할수 있습니다.

        let mater = Observable<Int>.create { obs in
            obs.onNext(1)
            obs.onNext(2)
            obs.onError(NSError(domain: "", code: 100))
            return Disposables.create()
            }
            mater.materialize()
            .compactMap { event -> Event<Int>? in
                switch event {
                case .next :
                    return event
                case .completed :
                    return nil
                case .error(_):
                    return .next(3)
                }
            }
            .debug()
            .subscribe(onNext: {
                print($0)
            })
            .disposed(by: disposeBag)
            
            //print
2022-03-15 14:38:17.405: ViewController.swift:52 (testMeterialize()) -> subscribed
2022-03-15 14:38:17.406: ViewController.swift:52 (testMeterialize()) -> Event next(next(1))
next(1)
2022-03-15 14:38:17.406: ViewController.swift:52 (testMeterialize()) -> Event next(next(2))
next(2)
2022-03-15 14:38:17.407: ViewController.swift:52 (testMeterialize()) -> Event next(next(3))
next(3)
2022-03-15 14:38:17.407: ViewController.swift:52 (testMeterialize()) -> Event completed
2022-03-15 14:38:17.407: ViewController.swift:52 (testMeterialize()) -> isDisposed

이렇게 error 가 나와도 next 로 흘려보낼수 있고 방출되는 Event Type에 따라서 핸들링이 자유로워 집니다.

하지만 next로 맵핑되어있는 값을 사용할순 없습니다.

그래서 사용하는것이 .dematerialize Operator 입니다.

디메터럴라이즈 operator를 사용하면 맵핑되어 있는 시퀀스를 다시 이전 타입으로 맵핑을 벗겨 줍니다.

        let mater = Observable<Int>.create { obs in
            obs.onNext(1)
            obs.onNext(2)
            obs.onError(NSError(domain: "", code: 100))
            return Disposables.create()
            }
            mater.materialize()
            .compactMap { event -> Event<Int>? in
                switch event {
                case .next :
                    return event
                case .completed :
                    return nil
                case .error(_):
                    return .next(3)
                }
            }
            .dematerialize()
            .debug()
            .subscribe(onNext: {
                print($0)
            })
            .disposed(by: disposeBag)
            
            //print
2022-03-15 14:39:22.959: ViewController.swift:53 (testMeterialize()) -> subscribed
2022-03-15 14:39:22.961: ViewController.swift:53 (testMeterialize()) -> Event next(1)
1
2022-03-15 14:39:22.961: ViewController.swift:53 (testMeterialize()) -> Event next(2)
2
2022-03-15 14:39:22.961: ViewController.swift:53 (testMeterialize()) -> Event next(3)
3
2022-03-15 14:39:22.961: ViewController.swift:53 (testMeterialize()) -> Event completed
2022-03-15 14:39:22.961: ViewController.swift:53 (testMeterialize()) -> isDisposed

Timeout

timeout 은 지정된 시간동안 Event 가 Emit 되지 않으면 error 를 방출합니다.

        _ = Observable<Int>.interval(.milliseconds(500), scheduler: MainScheduler.instance)
            .filter { $0 < 3 }
            .timeout(.milliseconds(1000), scheduler: MainScheduler.instance)
            .do(onNext : { item in
                print("onNext = \(item)")
            },onError : { _ in
                print("onError = error")
            },onCompleted : {
                print("Completed")
            })
                .debug()
                .subscribe(onNext: { event in
                    print("subscribe == \(event)")
                }).disposed(by: disposeBag)

2022-03-15 15:21:41.633: ViewController.swift:76 (testTimeout()) -> subscribed
onNext = 0
2022-03-15 15:21:42.136: ViewController.swift:76 (testTimeout()) -> Event next(0)
subscribe == 0
onNext = 1
2022-03-15 15:21:42.635: ViewController.swift:76 (testTimeout()) -> Event next(1)
subscribe == 1
onNext = 2
2022-03-15 15:21:43.136: ViewController.swift:76 (testTimeout()) -> Event next(2)
subscribe == 2
onError = error
2022-03-15 15:21:44.136: ViewController.swift:76 (testTimeout()) -> Event error(Sequence timeout.)
Unhandled error happened: Sequence timeout.
2022-03-15 15:21:44.137: ViewController.swift:76 (testTimeout()) -> isDisposed

interval 로 생성되는 Observable 은 0.5 초 마다 Event 를 Emit 합니다.
Filter 를 통해 방출되는 Event 에 제한이 걸리게 되고
timeout 은 4초동안 Event의 방출이 없으면 error 를 Emit 하게 됩니다.

하지만 굳이 에러 핸들링만 할필요는 없습니다
other 를 통해 원하는 Event를 얻을수도 있습니다.

                _ = Observable<Int>.interval(.milliseconds(500), scheduler: MainScheduler.instance)
                .filter { $0 < 3 }
                .timeout(.milliseconds(4000), other: Observable.just(10), scheduler: MainScheduler.instance)
                .do(onNext : {
                    print("onNext == \($0)")
                },onError : { _ in
                    print("error")
                },onCompleted : nil )
                    .debug()
                    .subscribe(onNext: {
                    print($0)
                }).disposed(by: disposeBag)

2022-03-15 15:30:39.466: ViewController.swift:89 (testTimeout()) -> subscribed
onNext == 0
2022-03-15 15:30:39.968: ViewController.swift:89 (testTimeout()) -> Event next(0)
0
onNext == 1
2022-03-15 15:30:40.468: ViewController.swift:89 (testTimeout()) -> Event next(1)
1
onNext == 2
2022-03-15 15:30:40.968: ViewController.swift:89 (testTimeout()) -> Event next(2)
2
onNext == 10
2022-03-15 15:30:44.968: ViewController.swift:89 (testTimeout()) -> Event next(10)
10
2022-03-15 15:30:44.968: ViewController.swift:89 (testTimeout()) -> Event completed
2022-03-15 15:30:44.968: ViewController.swift:89 (testTimeout()) -> isDisposed

error 가 나오면 other 로 보낸 Event 가 전달됩니다.

error 에 대응할수 있습니다.

Using

Observable 과 동일한 disposable을 생성할수 있습니다.

필요한 인자 값으로는

  • resourceFactory : disposable 을 얻기 위한 함수

  • observableFactory : disposable 과 연결된 Observable 을 반환하는 함수

    func testUsing(){
        class DisposableResource: Disposable {
            var values: [Int] = []
            var isDisposed: Bool = false

            func dispose() {
                self.values = []
                self.isDisposed = true
                print("!!!DisposableResource is Disposed!!!")
            }
            init(with values: [Int]) {
                self.values = values
            }
        }
        let observable = Observable<Int>.using({ () -> DisposableResource in
            return DisposableResource.init(with: [1, 2, 3, 4])
        }, observableFactory: { (resource) -> Observable<Int> in
            if resource.isDisposed {
                return Observable<Int>.from([])
            } else {
                return Observable<Int>.from(resource.values)
            }
        })

        observable
            .debug()
            .subscribe()
            .disposed(by: disposeBag)


    }

2022-03-15 16:00:07.931: ViewController.swift:88 (testUsing()) -> subscribed
2022-03-15 16:00:07.933: ViewController.swift:88 (testUsing()) -> Event next(1)
2022-03-15 16:00:07.933: ViewController.swift:88 (testUsing()) -> Event next(2)
2022-03-15 16:00:07.933: ViewController.swift:88 (testUsing()) -> Event next(3)
2022-03-15 16:00:07.933: ViewController.swift:88 (testUsing()) -> Event next(4)
2022-03-15 16:00:07.933: ViewController.swift:88 (testUsing()) -> Event completed
2022-03-15 16:00:07.933: ViewController.swift:88 (testUsing()) -> isDisposed
!!!DisposableResource is Disposed!!!

profile
iOS/Android/FE/BE

0개의 댓글