[iOS 10주차] RxSwift 2부: Operator w/ 마블 다이어그램

DoyleHWorks·2024년 12월 26일
0

Ref.

https://reactivex.io/documentation/operators.html
https://rxmarbles.com

마블 다이어그램

https://www.youtube.com/watch?v=fZn2sVUq3Vk
  • 화살표는 시간의 흐름을 의미. 구슬의 간격은 시간의 흐름과 비례해서 표현할 수 있음.
  • 연산자 박스: rx operator.
  • 연산자 박스 기준 상단 화살표: input 스트림.
  • 연산자 박스 기준 하단 화살표: output 스트림.
  • 마블 (구슬): 방출되는 값
  • X 표시: 에러.
  • | 표시: 스트림 종료.

Operator

ReactiveX operator의 여러 마블 다이어그램: https://rxmarbles.com

RxSwift의 Operator는 Observable의 데이터를 변환하거나 결합하고 필터링하는 기능을 제공한다. 이를 통해 복잡한 데이터 흐름을 더 간단하고 직관적으로 처리할 수 있다.

map

map은 각 이벤트를 변환하여 새로운 Observable을 생성한다.

let observable = Observable.of(1, 2, 3)
observable
    .map { $0 * 2 } // 각 값을 2배로 변환
    .subscribe(onNext: { print($0) })

출력:

2  
4  
6  

zip

zip은 여러 Observable의 데이터를 순서대로 결합하여 새 Observable을 생성한다.

let observable1 = Observable.of(1, 2, 3)
let observable2 = Observable.of("A", "B", "C")

Observable.zip(observable1, observable2) { "\($0)\($1)" }
    .subscribe(onNext: { print($0) })

출력:

1A  
2B  
3C  

merge

merge는 여러 Observable의 데이터를 시간 순서대로 합친다.

let observableA = Observable<Int>.interval(.seconds(2), scheduler: SerialDispatchQueueScheduler(qos: .default))
    .take(3)
    .map { "A -> \($0)번째 방출"}

let observableB = Observable<Int>.interval(.seconds(5), scheduler: SerialDispatchQueueScheduler(qos: .default))
    .take(3)
    .map { "B -> \($0)번째 방출"}
    
Observable
    .merge(observableA, observableB)
    .subscribe(onNext: { print("onNext: \($0)") })

출력 (시간 순):

onNext: A -> 0번째 방출
onNext: A -> 1번째 방출
onNext: B -> 0번째 방출
onNext: A -> 2번째 방출
onNext: B -> 1번째 방출
onNext: B -> 2번째 방출

mergeMap

참고: flatMapmergeMap의 다른 이름이다(출처).

map과 유사하게 스트림에서 방출되는 값을 변형한다.
하지만 mergeMap의 경우, 스트림에서 방출된 값을 mergeMap 안에 정의된 스트림으로 또 다시 흐르게 한다.
스트림의 연산자로서 스트림을 넣었을 때, 두 개의 스트림이 한 개의 스트림으로 펼쳐진 결과가 나오는 것이다.

let observableA = Observable<Int>.interval(.seconds(1), scheduler: SerialDispatchQueueScheduler(qos: .default))
    .take(5)

let fruitDictionary = [
    0: "사과",
    1: "바나나",
    2: "오렌지",
    3: "멜론"
]

func getFruitObservable(_ number: Int) -> Observable<String> {
    Observable.create { observer in
        guard let fruit = fruitDictionary[number] else {
            observer.onError(MyError())
            return Disposables.create()
        }
        observer.onNext(fruit)
        return Disposables.create()
    }
}

observableA
    .flatMap { data in
        return getFruitObservable(data)
    }
    .subscribe(
        onNext: { print("onNext: \($0)") },
        onError: { print("onError: \($0)") },
        onCompleted: { print("onCompleted") },
        onDisposed: { print("onDisposed") }
    )

출력:

onNext: 사과
onNext: 바나나
onNext: 오렌지
onNext: 멜론
onError: MyError()
onDisposed

concat

concat은 여러 Observable을 순차적으로 연결하여 처리한다. 이전 Observable이 완료된 후에 다음 Observable이 시작된다.

let observable1 = Observable.of("A", "B")
let observable2 = Observable.of("1", "2")

Observable.concat([observable1, observable2])
    .subscribe(onNext: { print($0) })

출력:

A  
B  
1  
2  

combineLatest

combineLatest는 여러 Observable의 가장 최근 값을 결합하여 새 Observable을 생성한다.

let observable1 = BehaviorSubject(value: "A")
let observable2 = BehaviorSubject(value: "1")

Observable.combineLatest(observable1, observable2) { "\($0)\($1)" }
    .subscribe(onNext: { print($0) })

observable1.onNext("B")
observable2.onNext("2")

출력:

A1  
B1  
B2  

withLatestFrom

withLatestFrom는 트리거 Observable이 방출될 때 다른 Observable의 최신 값을 결합한다.

// 옵저버블1: 초기값 1을 가진 트리거 역할
let observable1 = BehaviorSubject(value: 1)

// 옵저버블2: 최신 값을 제공
let observable2 = PublishSubject<String>()

observable1
    .withLatestFrom(observable2) { (x, y) in "\(x)\(y)" } // observable1과 observable2 결합
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

// 옵저버블2에서 최신 값 방출
observable2.onNext("A")  // 최신 값: A
observable1.onNext(2)    // 출력: 2A
observable2.onNext("B")  // 최신 값: B
observable2.onNext("C")  // 최신 값: C
observable2.onNext("D")  // 최신 값: D
observable1.onNext(3)    // 출력: 3D
observable1.onNext(4)    // 출력: 4D
observable1.onNext(5)    // 출력: 5D

출력:

2A
3D
4D
5D

share

https://rxjs.dev/api/operators/share

share는 하나의 Observable을 여러 구독자가 공유하도록 만든다. 구독 간 중복 작업을 줄이고 효율성을 높인다.

let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .share()

observable.subscribe(onNext: { print("Observer 1:", $0) })
observable.subscribe(onNext: { print("Observer 2:", $0) })

출력 (동일 데이터 공유):

Observer 1: 0  
Observer 2: 0  
Observer 1: 1  
Observer 2: 1  
...

profile
Reciprocity lies in knowing enough

0개의 댓글