[RxSwift] Observable의 변환 연산자

이정훈·2024년 3월 24일
0

ReactiveX

목록 보기
4/5
post-thumbnail

🔄 Observable의 항목 변환하기

시작에 앞서 Observable의 정의를 간단히 살펴보면..

Observable은 관찰 가능한 Stream으로 비동기 이벤트의 시퀀스를 생성할 수 있는 대상을 의미한다.

Observable비동기 이벤트의 시퀀스를 생성하면서 항목(item)을 방출하게 된다. 이때 Observable이 방출하는 항목을 다른 형태로 반환 받고 싶은 상황이 생길 수 있는데, Rx 라이브러리에는 Observable이 항목을 방출함과 동시에 Observable이 방출한 항목을 변환할 수 있는 여러가지 연산자(Operators)들을 제공하고 있다. 이번 포스트에서는 Rx 라이브러리에서 제공하는 Observable변환 연산자(transforming Operators)에 대해 알아보도록 하자.

map() 메서드

RxSwiftmap() 메서드는 일반적으로 Swift에서 사용하던 고차 함수map() 메서드와 그 기능이 유사하다.

RxSwiftmap() 메서드는 Observable이 방출하는 각각의 항목에 대해서 map() 메서드로 전달된 함수(클로저)를 적용 후 다시 하나의 Observable로 반환하는 메서드이다.

가령, 아래와 같이 1부터 10까지의 정수형 값을 방출하는 Observable이 있다고 해보자.

import RxSwift

let intArrayObservable = Observable<Int>.from(Array(1...10))

intArrayObservable
    .subscribe(
        onNext: {
            print($0)
        }, 
        onCompleted: {
            print("completed")
        }
    )
    
//1
//2
//3
//4
//5
//6
//7
//8
//9
//10
//completed

그런데 만약 Observable이 방출한 항목에 x10을 한 값을 받아보고 싶다면 map() 메서드를 이용하여 기능을 구현할 수 있다.

import RxSwift

let intArrayObservable = Observable<Int>.from(Array(1...10))

intArrayObservable
    .map {
        $0 * 10
    }
    .subscribe(
        onNext: {
            print($0)
        }, 
        onCompleted: {
            print("completed")
        }
    )

//10
//20
//30
//40
//50
//60
//70
//80
//90
//100
//completed

flatMap() 메서드

RxSwift에서 제공하는 flatMap() 메서드의 경우 Swift에서 제공하는 flatMap() 메서드와 조금의 차이가 있는데, 먼저 RxSwift에서 제공하는 flatMap()은 메서드로 전달되는 함수(클로저)의 반환 타입이 Observable 타입을 반환하게 되는데 메서드의 정의는 아래와 같다.

//flatMap 메서드 정의
public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
        -> Observable<Source.Element> {
    return FlatMap(source: self.asObservable(), selector: selector)
}

flatMap() 메서드로 전달된 함수(클로저)의 적용 결과 여러 개의 Observable이 생성 되고, 여기서 flatMap() 메서드의 핵심은 여러 개의 Observable이 방출하는 항목들을 하나의 Observable이 방출하는 시퀀스(Sequence)로 만들어 반환하는 것이다. 다시 말해, 여러 개의 Observable을 하나의 Observableflat하게 만드는 것이다.

아래의 코드와 함께 살펴보면..

import RxSwift

//flatMap() method
let intArrayObservable = Observable<Int>.from(Array(1...3))

intArrayObservable
    .flatMap { item in
        return Observable<Int>.from(Array(repeating: item, count: item))
    }
    .subscribe(
        onNext: {
            print($0)
        },
        onCompleted: {
            print("completed")
        }
    )

//1
//2
//2
//3
//3
//3
//completed

intArrayObservable은 1부터 3까지의 Int 타입의 항목을 방출하는 Observable이다. intArrayObservableflatMap() 메서드를 적용하게 되면 1부터 3까지의 각각 새로운 Observable이 3개 생성되고 이 3개의 Observable이 방출하는 항목을 하나의 Observable로 반환한다.

한 가지 더 주목할 점은 기본적으로 Observable비동기 이벤트와 관련된 시퀀스(Sequence)를 생성하기 때문에 아래와 같이 각 Observable이 방출하는 항목이 많아지면 방출 순서가 동기 방식과 차이를 보일 수 있다.

import RxSwift

let intArrayObservable = Observable<Int>.from(Array(1...5))

intArrayObservable
    .flatMap { item in
        return Observable<Int>.from(Array(repeating: item, count: item))
    }
    .subscribe(
        onNext: {
            print($0)
        },
        onCompleted: {
            print("completed")
        }
    )

//1
//2
//2
//3
//3
//4
//3
//4
//5
//4
//5
//4
//5
//5
//5
//completed

buffer()

buffer() 메서드는 Obeservable이 방출하는 항목을 Buffer에 보관 후 한 번에 번들로 방출할 수 있도록 하는 메서드이다.

public func buffer(timeSpan: RxTimeInterval,
				   count: Int,
                   scheduler: SchedulerType) -> Observable<[Element]> {
    BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}

RxSwift 라이브러리에는 buffer() 메서드가 위와 같이 정의 되어 있는데 메서드의 각 파라미터가 의미하는 바를 알아보자.

  • timeSpan: Buffer에 항목을 수집하는 시간을 설정.
  • count: Buffer에 항목을 수집할 수 있는 최대 크기를 지정.
  • scheduler: buffer() 메서드가 수행될 스레드를 지정.

사용 방법은 아래의 코드와 같다.

import RxSwift

let intArrayObservable = Observable<Int>.from(Array(1...6))

intArrayObservable
    .buffer(timeSpan: .seconds(1),
            count: 3,
            scheduler: MainScheduler.instance)
    .take(2)
    .subscribe(
        onNext: {
            print($0)
        },
        onCompleted: {
            print("completed")
        }
    )
    
//[1, 2, 3]
//[4, 5, 6]
//completed

Buffer에 수집할 시간은 1초로, Buffer에 수집할 항목의 최대 수는 3개로, 해당 메서드가 실행할 스레드는 Main 스레드에서 수행하도록 하였다. 또한 take() 메서드를 통해 방출된 항목을 2개만 가져오도록 하였다.

buffer() 메서드를 사용하면서 주의할 점은 만약 Buffer에 항목을 수집 중 에러가 발생하여 subscribe()onError 함수가 호출될 경우, Buffer에 항목이 수집 되었더라도 해당 Buffer는 방출되지 않고 곧바로 onError 함수가 호출된다.

scan() 메서드

scan() 메서드는 Observable이 방출한 항목을 scan() 메서드에 전달된 함수를 적용한 결과를 방출한다. 또한 이 방출된 항목은 다음 두 번째 항목이 방출될 때 함수의 첫 번째 파라미터의 전달인자로 사용된다.

사용법이 어디선가 많이 본 방식 아닌가?

RxSwiftscan() 메서드는 Swift에서 제공하는 고차함수인 reduce()와 같이 누적 값을 필요로 하는 곳에 사용될 수 있다.

public func scan<Seed>(_ seed: Seed,
					   accumulator: @escaping (Seed, Element) -> Seed) -> Infallible<Seed> {
    Infallible(asObservable().scan(seed, accumulator: accumulator))
}

위의 코드는 RxSwift 라이브러리에 정의 되어 있는 scan() 메서드이고, 첫번째 파라미터로 초기값seed를 전달 받고 있으며, scan() 메서드로 전달 받는 함수(클로저)는 두 개의 파라미터를 가지고 있어 첫 번재 파라미터로는 seed값 혹은 이전 연산의 결과가 두 번째 파라미터로는 Observable이 방출하는 다음 항목의 값이 전달된다.

아래의 예제는 Observable이 1부터 6까지의 항목을 방출할 때 방출한 값의 누적 값을 출력하는 예제이다.

import RxSwift

let intArrayObservable = Observable<Int>.from(Array(1...6))

intArrayObservable
    .scan(0) { (prev, next) in
        return prev + next
    }
    .subscribe(
        onNext: {
            print($0)
        },
        onCompleted: {
            print("completed")
        }
    )
    
//1
//3
//6
//10
//15
//21
//completed

reference

https://reactivex.io/

profile
새롭게 알게된 것을 기록하는 공간

0개의 댓글