subscribe 시점이 아닌 방출 시점에 이벤트 핸들링
extension PrimitiveSequenceType where Trait == MaybeTrait {
/**
Invokes an action for each event in the observable sequence, and propagates all observer messages through the result sequence.
- seealso: [do operator on reactivex.io](http://reactivex.io/documentation/operators/do.html)
- parameter onNext: Action to invoke for each element in the observable sequence.
- parameter afterNext: Action to invoke for each element after the observable has passed an onNext event along to its downstream.
- parameter onError: Action to invoke upon errored termination of the observable sequence.
- parameter afterError: Action to invoke after errored termination of the observable sequence.
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
- parameter afterCompleted: Action to invoke after graceful termination of the observable sequence.
- parameter onSubscribe: Action to invoke before subscribing to source observable sequence.
- parameter onSubscribed: Action to invoke after subscribing to source observable sequence.
- parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed.
- returns: The source sequence with the side-effecting behavior applied.
*/
public func `do`(onNext: ((Element) throws -> Void)? = nil,
afterNext: ((Element) throws -> Void)? = nil,
onError: ((Swift.Error) throws -> Void)? = nil,
afterError: ((Swift.Error) throws -> Void)? = nil,
onCompleted: (() throws -> Void)? = nil,
afterCompleted: (() throws -> Void)? = nil,
onSubscribe: (() -> Void)? = nil,
onSubscribed: (() -> Void)? = nil,
onDispose: (() -> Void)? = nil)
-> Maybe<Element> {
return Maybe(raw: self.primitiveSequence.source.do(
onNext: onNext,
afterNext: afterNext,
onError: onError,
afterError: afterError,
onCompleted: onCompleted,
afterCompleted: afterCompleted,
onSubscribe: onSubscribe,
onSubscribed: onSubscribed,
onDispose: onDispose)
)
}
}
Observable<Int>.of(1,2,3,4,5)
.do(onNext: { num in
print(num)
})
.subscribe({
print($0)
})
.disposed(by: disposeBag)
/*
1
next(1)
2
next(2)
3
next(3)
4
next(4)
5
next(5)
completed
*/
2초만큼의 delay를 준 뒤에 이벤트를 방출함
/**
Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed.
- seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html)
- parameter dueTime: Relative time shift of the source by.
- parameter scheduler: Scheduler to run the subscription delay timer on.
- returns: the source Observable shifted in time by the specified delay.
*/
public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> PrimitiveSequence<Trait, Element> {
PrimitiveSequence(raw: self.primitiveSequence.source.delay(dueTime, scheduler: scheduler))
}
Observable<Int>.of(1,2,3,4,5)
.delay(.seconds(2),
scheduler: MainScheduler.instance)
.subscribe({
print($0)
})
.disposed(by: disposeBag)
subscribeOn은 전체 체이닝의 시작점
을 결정하고, observeOn은 다음 목표가 어디일지를 알려준다.
따라서 subscribeOn은 동작할 스케쥴러를 알려주기 때문에 가급적 한 번만 사용하는 것이 좋고 observeOn은 계속 사용할 수 있다.
/**
Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
This only invokes observer callbacks on a `scheduler`. In case the subscription and/or unsubscription
actions have side-effects that require to be run on a scheduler, use `subscribeOn`.
- seealso: [observeOn operator on reactivex.io](http://reactivex.io/documentation/operators/observeon.html)
- parameter scheduler: Scheduler to notify observers on.
- returns: The source sequence whose observations happen on the specified scheduler.
*/
public func observe(on scheduler: ImmediateSchedulerType)
-> PrimitiveSequence<Trait, Element> {
PrimitiveSequence(raw: self.source.observe(on: scheduler))
}
let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueueQOS: .Default)
[1,2,3,4,5].toObservable()
.subscribeOn(MainScheduler.instance) // 1
.doOnNext {
UIApplication.sharedApplication().networkActivityIndicatorVisible = true
return $0
} // 2
.observeOn(backgroundScheduler) // 3
.flatMapLatest {
HTTPBinDefaultAPI.sharedAPI.get($0)
} // 4
.observeOn(MainScheduler.instance) // 5
.subscribe {
UIApplication.sharedApplication().networkActivityIndicatorVisible = false
print($0)
} // 6
/**
Subscribes an event handler to an observable sequence.
- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (RxSwift.Event<Self.Element>) -> Void) -> RxSwift.Disposable
어떤 sequence든 Event<T> enum sequence
로 변환한다.
즉 이벤트를 전달할 때 어떤 이벤트인지도 같이 전달한다.
extension BlockingObservable {
/// Blocks current thread until sequence terminates.
///
/// The sequence is materialized as a result type capturing how the sequence terminated (completed or error), along with any elements up to that point.
///
/// - returns: On completion, returns the list of elements in the sequence. On error, returns the list of elements up to that point, along with the error itself.
public func materialize() -> MaterializedSequenceResult<Element> {
self.materializeResult()
}
}
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.materialize()
.take(5)
.subscribe({
print($0)
})
.disposed(by: disposeBag)
/*
next(next(0))
next(next(1))
next(next(2))
next(next(3))
next(next(4))
completed
*/
일정 시간동안 이벤트를 방출하지 않으면 타임아웃
/**
Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer.
- seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)
- parameter dueTime: Maximum duration between values before a timeout occurs.
- parameter scheduler: Scheduler to run the timeout timer on.
- returns: An observable sequence with a `RxError.timeout` in case of a timeout.
*/
public func timeout(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> PrimitiveSequence<Trait, Element> {
PrimitiveSequence<Trait, Element>(raw: self.primitiveSequence.source.timeout(dueTime, scheduler: scheduler))
}
/**
Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. If the next element isn't received within the specified timeout duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on.
- seealso: [timeout operator on reactivex.io](http://reactivex.io/documentation/operators/timeout.html)
- parameter dueTime: Maximum duration between values before a timeout occurs.
- parameter other: Sequence to return in case of a timeout.
- parameter scheduler: Scheduler to run the timeout timer on.
- returns: The source sequence switching to the other sequence in case of a timeout.
*/
public func timeout(_ dueTime: RxTimeInterval,
other: PrimitiveSequence<Trait, Element>,
scheduler: SchedulerType) -> PrimitiveSequence<Trait, Element> {
PrimitiveSequence<Trait, Element>(raw: self.primitiveSequence.source.timeout(dueTime, other: other.source, scheduler: scheduler))
}
Observable<Int>.interval(.seconds(2), scheduler: MainScheduler.instance)
.timeout(.seconds(1), scheduler: MainScheduler.instance)
.subscribe({
print($0)
})
.disposed(by: disposeBag)
extension ObservableType {
/**
Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
- seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html)
- parameter resourceFactory: Factory function to obtain a resource object.
- parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource.
- returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object.
*/
public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
}
}