startWith
Observable.from(1...5)
.startWith(0)
.debug()
.subscribe()
concat
- 두 개 이상의 sequence를 직렬화 한다.
- 하나의
Observable
이 완료될 때까지 event를 전달하고 완료 되면 그 다음 Observable
의 이벤트를 연이어 전달한다.
- 다른
Observable
이 완료되면 직렬화된 Observable
은 .completed
된다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = ReplaySubject<Int>.create(bufferSize: 1)
_ = Observable.concat(subject1, subject2)
.debug()
.subscribe()
.disposed(by: disposeBag)
subject1.on(.next(1))
subject1.on(.next(2))
subject1.on(.next(3))
subject2.on(.next(10))
subject1.on(.next(4))
subject2.on(.next(20))
subject1.on(.completed)
concatMap
Observable
에서 방출한 element를 새로운 Observable
를 만들며, 만들어진 Observable
의 element를 방출한다.
concat
+ flatMap
let sequences = [
"key🟢": Observable.of(1, 2, 3).debug("🟢"),
"key🔴": Observable.of(10, 20, 30).debug("🔴")
]
_ = Observable.of("key🟢", "key🔴")
.concatMap { sequences[$0] ?? .empty() }
.debug("concatMap")
.subscribe()
Merge
merge
- 같은 타입의 Event를 방출하는
Observable
을 합성한다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.merge(
subject1.debug("🟢"),
subject2.debug("🔴")
)
.debug("merge")
.subscribe()
.disposed(by: disposeBag)
subject1.on(.next("1"))
subject2.on(.next("A"))
subject1.on(.next("2"))
subject1.on(.next("3"))
subject2.on(.next("B"))
subject1.on(.completed)
subject2.on(.next("After completed"))
Combine
combineLatest
- Inner sequence의 최종값을 받아 방출하는 sequence를 만든다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.combineLatest(
subject1.debug("🟢"),
subject2.debug("🔴")
) { (element1, element2) -> String in
"🟢\(element1) 🔴\(element2)"
}
.debug("combineLatest")
.subscribe()
.disposed(by: disposeBag)
subject1.on(.next("1"))
subject2.on(.next("A"))
subject1.on(.next("2"))
subject1.on(.next("3"))
subject2.on(.next("B"))
subject1.on(.completed)
subject2.on(.next("After completed"))
zip
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.zip(
subject1.debug("🟢"),
subject2.debug("🔴")
) { (element1, element2) -> String in
"🟢\(element1) 🔴\(element2)"
}
.debug("combineLatest")
.subscribe()
.disposed(by: disposeBag)
subject1.on(.next("1"))
subject2.on(.next("A"))
subject1.on(.next("2"))
subject1.on(.next("3"))
subject2.on(.next("B"))
subject1.on(.completed)
subject2.on(.next("After completed"))
Trigger
withLatestFrom
- input
Observable
의 event가 방출된 이후에 기존 Observable
의 새로운 element가 방출될 경우, 기존 Observable
element와 input Observable
의 element를 결합하여 방출한다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
subject1.withLatestFrom(subject2) {
$0 + " " + $1
}
.debug("withLatestFrom")
.subscribe()
.disposed(by: disposeBag)
subject1.on(<.next("1"))
subject1.on(.next("2"))
subject2.on(.next("A"))
subject1.on(.next("3"))
subject1.on(.next("4"))
subject2.on(.next("B"))
subject2.on(.next("C"))
subject1.on(.next("5"))
subject1.on(.completed)
subject2.on(.next("After completed"))
amb
- 가장 먼저 Event가 발생한 sequence를 따르도록 한다.
let scheduler = MainScheduler.instance
let sequence1 = Observable<Int>.interval(
.milliseconds(10),
scheduler: scheduler
).take(5)
.map { "🟢 \($0)" }
let sequence2 = Observable<Int>.interval(
.milliseconds(5),
scheduler: scheduler
).take(5)
.map { "🔴 \($0)" }
let sequence3 = Observable<Int>.interval(
.milliseconds(15),
scheduler: scheduler
).take(5)
.map { "🟡 \($0)" }
Observable.amb([sequence1, sequence2, sequence3])
.debug("amb")
.subscribe()
switchLatest
- 가장 최신의 Observable이 방출하는 Event를 구독자에게 전달한다.
let disposeBag = DisposeBag()
struct Student {
let name: String
let scoreSubject: BehaviorSubject<Int>
}
let student🟢 = Student(
name: "🟢",
scoreSubject: BehaviorSubject<Int>(value: 1)
)
let student🔴 = Student(
name: "🔴",
scoreSubject: BehaviorSubject<Int>(value: 10)
)
let studentSubject = PublishSubject<Observable<Int>>()
studentSubject.switchLatest()
.debug("switchLatest")
.subscribe()
.disposed(by: disposeBag)
studentSubject.on(.next(student🟢.scoreSubject))
student🟢.scoreSubject.on(.next(2))
studentSubject.on(.next(student🔴.scoreSubject))
student🟢.scoreSubject.on(.next(3))
student🔴.scoreSubject.on(.next(20))
student🟢.scoreSubject.on(.next(4))
student🔴.scoreSubject.on(.next(30))
Element
reduce
Observable
이 완료되었을 경우, 가공되어진 값을 방출한다.
Observable.from(1...10)
.reduce(0, accumulator: +)
.debug("reduce")
.subscribe()
scan
Observable.from(1...10)
.scan(0, accumulator: +)
.debug("scan")
.subscribe()