RxSwift) Operators(5)

Havi·2021년 3월 24일
0

RxSwift 기초

목록 보기
9/14

Do

subscribe 시점이 아닌 방출 시점에 이벤트 핸들링

구현체


extension PrimitiveSequenceType where Trait == MaybeTrait {
    /**
     Invokes an action for each event in the observable sequence, and propagates all observer messages through the result sequence.
     
     - seealso: [do operator on reactivex.io](http://reactivex.io/documentation/operators/do.html)
     
     - parameter onNext: Action to invoke for each element in the observable sequence.
     - parameter afterNext: Action to invoke for each element after the observable has passed an onNext event along to its downstream.
     - parameter onError: Action to invoke upon errored termination of the observable sequence.
     - parameter afterError: Action to invoke after errored termination of the observable sequence.
     - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
     - parameter afterCompleted: Action to invoke after graceful termination of the observable sequence.
     - parameter onSubscribe: Action to invoke before subscribing to source observable sequence.
     - parameter onSubscribed: Action to invoke after subscribing to source observable sequence.
     - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed.
     - returns: The source sequence with the side-effecting behavior applied.
     */
    public func `do`(onNext: ((Element) throws -> Void)? = nil,
                     afterNext: ((Element) throws -> Void)? = nil,
                     onError: ((Swift.Error) throws -> Void)? = nil,
                     afterError: ((Swift.Error) throws -> Void)? = nil,
                     onCompleted: (() throws -> Void)? = nil,
                     afterCompleted: (() throws -> Void)? = nil,
                     onSubscribe: (() -> Void)? = nil,
                     onSubscribed: (() -> Void)? = nil,
                     onDispose: (() -> Void)? = nil)
        -> Maybe<Element> {
            return Maybe(raw: self.primitiveSequence.source.do(
                onNext: onNext,
                afterNext: afterNext,
                onError: onError,
                afterError: afterError,
                onCompleted: onCompleted,
                afterCompleted: afterCompleted,
                onSubscribe: onSubscribe,
                onSubscribed: onSubscribed,
                onDispose: onDispose)
            )
    }
}

마블

예제

Observable<Int>.of(1,2,3,4,5)
            .do(onNext: { num in
                print(num)
            })
            .subscribe({
                print($0)
            })
            .disposed(by: disposeBag)
            /*
         1
         next(1)
         2
         next(2)
         3
         next(3)
         4
         next(4)
         5
         next(5)
         completed

         */

Delay

2초만큼의 delay를 준 뒤에 이벤트를 방출함

구현체

/**
     Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed.

     - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html)

     - parameter dueTime: Relative time shift of the source by.
     - parameter scheduler: Scheduler to run the subscription delay timer on.
     - returns: the source Observable shifted in time by the specified delay.
     */
    public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> PrimitiveSequence<Trait, Element> {
        PrimitiveSequence(raw: self.primitiveSequence.source.delay(dueTime, scheduler: scheduler))
    }

마블

예제

Observable<Int>.of(1,2,3,4,5)
            .delay(.seconds(2), 
                   scheduler: MainScheduler.instance)
            .subscribe({
                print($0)
            })
            .disposed(by: disposeBag)

ObserveOn

민소네님 블로그
필권님 블로그

subscribeOn은 전체 체이닝의 시작점을 결정하고, observeOn은 다음 목표가 어디일지를 알려준다.

따라서 subscribeOn은 동작할 스케쥴러를 알려주기 때문에 가급적 한 번만 사용하는 것이 좋고 observeOn은 계속 사용할 수 있다.

구현체

/**
     Wraps the source sequence in order to run its observer callbacks on the specified scheduler.

     This only invokes observer callbacks on a `scheduler`. In case the subscription and/or unsubscription
     actions have side-effects that require to be run on a scheduler, use `subscribeOn`.

     - seealso: [observeOn operator on reactivex.io](http://reactivex.io/documentation/operators/observeon.html)

     - parameter scheduler: Scheduler to notify observers on.
     - returns: The source sequence whose observations happen on the specified scheduler.
     */
    public func observe(on scheduler: ImmediateSchedulerType)
        -> PrimitiveSequence<Trait, Element> {
        PrimitiveSequence(raw: self.source.observe(on: scheduler))
    }

마블

민소네님 예제

	let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueueQOS: .Default)

	[1,2,3,4,5].toObservable()
		.subscribeOn(MainScheduler.instance) 	// 1
		.doOnNext {
			UIApplication.sharedApplication().networkActivityIndicatorVisible = true
			return $0
		} 		// 2
		.observeOn(backgroundScheduler) // 3
		.flatMapLatest {
			HTTPBinDefaultAPI.sharedAPI.get($0)
		}		// 4
		.observeOn(MainScheduler.instance) 		// 5
		.subscribe {
			UIApplication.sharedApplication().networkActivityIndicatorVisible = false
			print($0)
		}		// 6

SubscribeOn

구현체

/**
     Subscribes an event handler to an observable sequence.
     
     - parameter on: Action to invoke for each event in the observable sequence.
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    public func subscribe(_ on: @escaping (RxSwift.Event<Self.Element>) -> Void) -> RxSwift.Disposable

마블

필권님 예제

Materialize, Dematerialize

참고 블로그

어떤 sequence든 Event<T> enum sequence로 변환한다.

즉 이벤트를 전달할 때 어떤 이벤트인지도 같이 전달한다.

구현체

extension BlockingObservable {
    /// Blocks current thread until sequence terminates.
    ///
    /// The sequence is materialized as a result type capturing how the sequence terminated (completed or error), along with any elements up to that point.
    ///
    /// - returns: On completion, returns the list of elements in the sequence. On error, returns the list of elements up to that point, along with the error itself.
    public func materialize() -> MaterializedSequenceResult<Element> {
        self.materializeResult()
    }
}

마블

예제

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
            .materialize()
            .take(5)
            .subscribe({
                print($0)
            })
            .disposed(by: disposeBag)
            /*
            next(next(0))
            next(next(1))
            next(next(2))
            next(next(3))
            next(next(4))
            completed
            */

TimeOut

일정 시간동안 이벤트를 방출하지 않으면 타임아웃

구현체

/**
     Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer.
     
     - seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)
     
     - parameter dueTime: Maximum duration between values before a timeout occurs.
     - parameter scheduler: Scheduler to run the timeout timer on.
     - returns: An observable sequence with a `RxError.timeout` in case of a timeout.
     */
    public func timeout(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> PrimitiveSequence<Trait, Element> {
        PrimitiveSequence<Trait, Element>(raw: self.primitiveSequence.source.timeout(dueTime, scheduler: scheduler))
    }
    
    /**
     Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. If the next element isn't received within the specified timeout duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on.
     
     - seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)
     
     - parameter dueTime: Maximum duration between values before a timeout occurs.
     - parameter other: Sequence to return in case of a timeout.
     - parameter scheduler: Scheduler to run the timeout timer on.
     - returns: The source sequence switching to the other sequence in case of a timeout.
     */
    public func timeout(_ dueTime: RxTimeInterval,
                        other: PrimitiveSequence<Trait, Element>,
                        scheduler: SchedulerType) -> PrimitiveSequence<Trait, Element> {
        PrimitiveSequence<Trait, Element>(raw: self.primitiveSequence.source.timeout(dueTime, other: other.source, scheduler: scheduler))
    }

마블

예제

Observable<Int>.interval(.seconds(2), scheduler: MainScheduler.instance)
            .timeout(.seconds(1), scheduler: MainScheduler.instance)
            .subscribe({
                print($0)
            })
            .disposed(by: disposeBag)

Using

구현체

extension ObservableType {
    /**
     Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.

     - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html)

     - parameter resourceFactory: Factory function to obtain a resource object.
     - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource.
     - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object.
     */
    public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
        Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }
}

마블

예제

profile
iOS Developer

0개의 댓글