startWith

  • 초기 값을 sequence의 앞에 붙인다.
Observable.from(1...5)
.startWith(0)
.debug()
.subscribe()

concat

  • 두 개 이상의 sequence를 직렬화 한다.
    • 하나의 Observable이 완료될 때까지 event를 전달하고 완료 되면 그 다음 Observable의 이벤트를 연이어 전달한다.
    • 다른 Observable이 완료되면 직렬화된 Observable.completed된다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = ReplaySubject<Int>.create(bufferSize: 1)

_ = Observable.concat(subject1, subject2)
    .debug()
    .subscribe()
    .disposed(by: disposeBag)

subject1.on(.next(1))
subject1.on(.next(2))
subject1.on(.next(3))
subject2.on(.next(10))
subject1.on(.next(4))
subject2.on(.next(20))
subject1.on(.completed)

concatMap

  • Observable에서 방출한 element를 새로운 Observable를 만들며, 만들어진 Observable의 element를 방출한다.
  • concat + flatMap
let sequences = [
    "key🟢": Observable.of(1, 2, 3).debug("🟢"),
    "key🔴": Observable.of(10, 20, 30).debug("🔴")
]

_ = Observable.of("key🟢", "key🔴")
    .concatMap { sequences[$0] ?? .empty() }
    .debug("concatMap")
    .subscribe()

Merge

merge

  • 같은 타입의 Event를 방출하는 Observable 을 합성한다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.merge(
    subject1.debug("🟢"),
    subject2.debug("🔴")
)
    .debug("merge")
    .subscribe()
    .disposed(by: disposeBag)

subject1.on(.next("1"))
subject2.on(.next("A"))
subject1.on(.next("2"))
subject1.on(.next("3"))
subject2.on(.next("B"))
subject1.on(.completed)
subject2.on(.next("After completed"))

Combine

combineLatest

  • Inner sequence의 최종값을 받아 방출하는 sequence를 만든다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.combineLatest(
    subject1.debug("🟢"),
    subject2.debug("🔴")
) { (element1, element2) -> String in
    "🟢\(element1)  🔴\(element2)"
}
.debug("combineLatest")
.subscribe()
.disposed(by: disposeBag)

subject1.on(.next("1"))
subject2.on(.next("A"))
subject1.on(.next("2"))
subject1.on(.next("3"))
subject2.on(.next("B"))
subject1.on(.completed)
subject2.on(.next("After completed"))

zip

let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.zip(
    subject1.debug("🟢"),
    subject2.debug("🔴")
) { (element1, element2) -> String in
    "🟢\(element1)  🔴\(element2)"
}
.debug("combineLatest")
.subscribe()
.disposed(by: disposeBag)

subject1.on(.next("1"))
subject2.on(.next("A"))
subject1.on(.next("2"))
subject1.on(.next("3"))
subject2.on(.next("B"))
subject1.on(.completed)
subject2.on(.next("After completed"))

Trigger

withLatestFrom

  • input Observable의 event가 방출된 이후에 기존 Observable의 새로운 element가 방출될 경우, 기존 Observable element와 input Observable의 element를 결합하여 방출한다.
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

subject1.withLatestFrom(subject2) {
    $0 + " " + $1
}
.debug("withLatestFrom")
.subscribe()
.disposed(by: disposeBag)

subject1.on(<.next("1"))
subject1.on(.next("2"))
subject2.on(.next("A"))
subject1.on(.next("3"))
subject1.on(.next("4"))
subject2.on(.next("B"))
subject2.on(.next("C"))
subject1.on(.next("5"))
subject1.on(.completed)
subject2.on(.next("After completed"))

amb

  • 가장 먼저 Event가 발생한 sequence를 따르도록 한다.
let scheduler = MainScheduler.instance
let sequence1 = Observable<Int>.interval(
    .milliseconds(10),
    scheduler: scheduler
).take(5)
.map { "🟢 \($0)" }
let sequence2 = Observable<Int>.interval(
    .milliseconds(5),
    scheduler: scheduler
).take(5)
.map { "🔴 \($0)" }
let sequence3 = Observable<Int>.interval(
    .milliseconds(15),
    scheduler: scheduler
).take(5)
.map { "🟡 \($0)" }

Observable.amb([sequence1, sequence2, sequence3])
    .debug("amb")
    .subscribe()

switchLatest

  • 가장 최신의 Observable이 방출하는 Event를 구독자에게 전달한다.
let disposeBag = DisposeBag()
struct Student {
    let name: String
    let scoreSubject: BehaviorSubject<Int>
}

let student🟢 = Student(
    name: "🟢",
    scoreSubject: BehaviorSubject<Int>(value: 1)
)
let student🔴 = Student(
    name: "🔴",
    scoreSubject: BehaviorSubject<Int>(value: 10)
)
let studentSubject = PublishSubject<Observable<Int>>()

studentSubject.switchLatest()
    .debug("switchLatest")
    .subscribe()
    .disposed(by: disposeBag)

studentSubject.on(.next(student🟢.scoreSubject))
student🟢.scoreSubject.on(.next(2))
studentSubject.on(.next(student🔴.scoreSubject))
student🟢.scoreSubject.on(.next(3))
student🔴.scoreSubject.on(.next(20))
student🟢.scoreSubject.on(.next(4))
student🔴.scoreSubject.on(.next(30))

Element

reduce

  • Observable이 완료되었을 경우, 가공되어진 값을 방출한다.
Observable.from(1...10)
.reduce(0, accumulator: +)
.debug("reduce")
.subscribe()

scan

  • 가공되어진 값을 지속적으로 방출한다.
Observable.from(1...10)
.scan(0, accumulator: +)
.debug("scan")
.subscribe()
profile
🧑🏻‍💻 iOS Developer @TOSS

0개의 댓글