[Combine] Operators - 04

rbw·2023년 12월 20일
0

Combine

목록 보기
8/11

Splitters

이 친구는 바로 전에 본 합치는 joiners 친구와 반대되는 개념임니다. 얜 값 스트림을 둘 이상으로 분할하여 여러 파이프라인으로 동시에 전송하는 연산자임니다. 예를 들면 요런 느낌이라네요

                        upstream pipeline
                            splitter
        /                       |                      \
downstream pipeline    downstream pipeline    downstream pipeline

이 아이디어는 스플리터 아래의 다운스트림 파이프라인이 스플리터 위의 업스트림 파이프라인에서 방출되는 동일한 값을 동시에 수신한다는 것임니다.

이 오퍼레이터가 필요한 이유는 무엇일까요 ?? 대부분의 퍼블리셔(오퍼레이터 포함)는 구조체입니다. 따라서 한 퍼블리셔에 둘 이상의 구독자를 구독하면 핻아 퍼블리셔의 사본이 여러개 생성되어 두 개의 독립된 스트림이 만들어집니다. 아래의 예시를 보겠슴니다.

let pub = [1,2,3,4].map {
    Just($0)
        .delay(for: 1, scheduler: DispatchQueue.main)
    }
    .publisher
    .flatMap(maxPublishers:.max(1)) {
        return $0
    }
pub.sink {print($0)}
    .store(in:&self.storage)
delay(2) {
    pub.sink {print($0)}
        .store(in:&self.storage)
}
// OUTPUT
딜레이 전 sink => 1 2 3 4 4
딜레이 후 sink => x x 1 2 3

보시다시피 완전히 다른 두 개의 퍼블리셔가 있는 것처럼 두 번째 구독자를 위해 숫자 스트림이 또 시작되었습니다. 하지만 이것이 우리가 원하는 방식이 아니고, 두 번째 구독자에게도 첫 번째 구독자와 동일 값을 동시에 제공하려면 어떻게 해야 할까요 ?? 이것이 바로 스플리터가 해결하는 문제입니다.

Subject는 이미 일종의 스플리터임니다. 예시를 보겠슴니다.

let pub = [1,2,3,4].map {
    Just($0)
        .delay(for: 1, scheduler: DispatchQueue.main)
    }
    .publisher
    .flatMap(maxPublishers:.max(1)) {
        return $0
    }

// pub에서 나오는 값을 subject가 send 함
pub.sink {self.mySubject.send($0)}
    .store(in:&self.storage)

// 그러면 subject를 구독한 친구들이 받겠져
self.mySubject.sink {print($0)}
    .store(in:&self.storage)
delay(2) {
    self.mySubject.sink {print($0)}
        .store(in:&self.storage)
}
// OUTPUT
1 2 3 4
x x 3 4

위처럼 동일한 값을 전달받고 있음을 볼 수 있슴니다. Subject는 계속 값을 방출하고 있고, 구독자들은 해당 값을 동일하게 받고있슴니다.

Share

.share() (Publishers.Share)는 클래스 객체에서 업스트림을 효과적으로 래핑함니다. Publishers.Share는 구조체가 아니라 클래스입니다. 일반적으로 파이프라인을 여러 구독자가 동시에 다른 시간에 구독하도록 하는 것이 목표인 경우 이 작업만 수행하면 됨니다. 파이프라인에 참조 시맨틱스가 적용되어서 구독자들이 구독하는 것은 하나의 동일 파이프라인에 대한 서로 다른 참조임니다.

예시코드임니다.

let t = Timer.publish(every: 1, on: .main, in: .common).autoconnect()
    .scan(0) {i,_ in i+1}
    .share()

t.sink {print("ONE", $0)}
    .store(in: &self.storage)
delay(3) {
    t.sink {print("TWO", $0)}
        .store(in: &self.storage)
}

// OUTPUT
ONE 1 ONE 2 ONE 3 ONE 4 TWO 4 ONE 5 TWO 5 ...

두 번째 구독자도 나중에 합류하였지만 첫 번째 구독자와 동일한 값을 수신하는 것을 알 수 있습니다.

보통 Share는 프로퍼티로 많이 사용한다고 함니다.

let myTimer = Timer.publish(every: 1, on: .main, in: .common).autoconnect()
    .scan(0) {i,_ in i+1}
    .share()
    .eraseToAnyPublisher()
var timerStarted = false
override func viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    if !timerStarted {
        timerStarted = true
        self.myTimer.sink {_ in}
            .store(in:&self.storage)
    }
}

이제 저희는 클래스 내부에서 공용으로 사용이 가능합미다. 위 타이머는 첫 번째 구독자가 나타날 때까지는 카운트를 시작하지 않지만 파이프라인을 원하는 순간에 작동하도록 할 수 있씁니다. 위의 viewDidAppear에서 동작하는 것처럼 뷰가 화면에 나타나면 타이머를 시작하는 것도 가능합니다.

만약 여러 개의 다운스트림 파이프라인이 구독되어 있고, 그 중 한 파이프라인에서 문제가 생기면 어떻게 될까요? 취소 메시지가 파이프라인에 퍼지게 됩니다. 하지만 .share를 만나면 퍼지는게 중지됨니다. 따라서 문제가있는 파이프라인은 종료되지만 퍼블리셔 자체는 계속 발행하고 다른 구독자들도 값을 수신함니다.

반면에 마지막 남은 구독자가 실패하면 이 취소가 타이머 퍼블리셔까지 전달되어 전체 파이프라인이 종료됩니다.

share는 항상 업스트림에 무제한으로 요청한다고 함니다.

Multicast

.multicast는 하나의 매개변수, 즉 Subject 또는 Subject를 생성하는 함수를 받아들입니다. 이 연산자가 생성하는 연산자 객체(Publishers.Mulitcast)는 클래스이며, ConnectablePublisher 프로토콜을 채택하여 타이머와 마찬가지로 connect, autoconnect 메서드를 가지고 있슴니다.

내부적으로 이 오퍼레이터는 Subject를 유지, 관리하고 업스트림에서 값을 받으면 Subjectsend를 호출하여 해당 값을 전달합니다. 따라서 Subject는 이 연산자가 업스트림에서 받은 값을 다운스트림으로 전달할 뿐입니다. 이미 Subject가 스플리터 역할을 하는걸 저희는 알지만, 이 연산자는 참조의미론을 가지는 클래스가 되고, 내부적으로 Subject를 유지하면서 자체적으로 스플리터가 된다고 볼 수 있슴니다.

실제로 .share가 내부적으로 작동하는 방식은 다음과 같습니다. 내부적으로 share 객체는 Multicast 객체임니다. .share.multicast의 편리한 래퍼일 뿐임니다. 그럼 왜 .multicast를 사용해야 할까요?

.share 내부의 .multicast 뒤에는 .autoconnect가 오기 때문에 구독자가 있는 즉시 파이프라인이 시작되기 때문임니다. 만약 connect를 직접 호출하여 파이프라인을 수동으로 설정하는 기능을 원한다면 .multicast를 사용해야함니다.

예시 코드임니다.

let myMulticastingTimer = Timer.publish(every: 1, on: .main, in: .common)
    .autoconnect()
    .scan(0) {i,_ in i+1}
    .multicast(subject: PassthroughSubject())


self.myMulticastingTimer
    .sink(receiveCompletion: { print($0)}, receiveValue: { print($0)})
    .store(in:&self.storage)

// 아래의 connect가 없으면 명시적으로 연결을 지시 받을 떄까지 파이프라인이 시작되지 않슴니다.
// 따라서 구독자가 나타나도 암것도 안함니다.
// 중요한점은 얘도 유지하기 위해서 cancellable에 저장해줘야함니다.
self.myMulticastingTimer.connect()
    .store(in:&self.storage)

얘도 마찬가지로 각 구독 파이프라인은 취소해도 업스트림 파이프라인에 영향이 없슴니다. .multicast 아래에서 취소메시지가 위로 전달되지 않슴니다.

업스트림 파이프라인을 취소하고 타이머를 중지하는 유일한 방법은 처음 connect를 호출하여 받은 취소가능객체를 저장해서 얘를 특정조건에 cancel() 하믄 댐니다..!

Timers

타이머란 업스트림에서 도착하는 값의 타이밍을 다루는 연산자를 의미합니다.

타이머 연산자 중 가장 뛰언난 것은 .delay로 값이 업스트림에서 도착하는 시간과 다운스트림으로 전달되는 시간 사이에 일시정지를 삽입함니다.

Delay

.delay(Publishers.Delay)는 업스트림에서 값을 수신하는 시간과 다운스트림으로 전달되는 시간 사이에 일시 정지를 삽입합니다. 매개변수는 다음과 같슴니다.

  • for: 지연의 길이임니다. .seconds(1), .milliseconds(100)등과 같은 표현식을 작성할 수 있습니다.
  • tolerance: 허용오차임니다. for와 같고, 옵셔널임니다
  • scheduler: 지연을 측정할 큐 또는 런루프 임니다. DispatchQueue, OperationQueue, RunLoop를 입력가능하며 일반적으로 사용하는 값은 DispatchQueue.main입니다.
  • options: 옵셔널임니다.

Debounce

.debounce(Publishers.Debounce)는 값이 너무 빨리 도착하는 경우 다운스트림으로 흐르지 않도록 시간간격을 주어서 방지합니다. 업스트림에서 값이 도착하면 오퍼레이터는 이를 버퍼링하고 내부 타이머를 시작함니다. 만약 업스트림에서 새로운 값이 도착하지 않고 타이머가 종료되면 이 오퍼레이터는 값을 아래로 전달합니다. 타이머가 종료되기 전에 새 값이 도착하면 버퍼링된 값을 버리고 새 값을 버퍼에 저장하고 타이머를 다시 시작합니다.

따라서 다운스트림으로 전달되는 값은 너무 빨리 다른 값이 뒤따르지 않은 가장 최근의 값입니다. 다른 값이 너무 빨리 도착하는지 여부를 알기 전에 타이머의 지속 시간을 기다려야하므로 스트림에 일시중지가 삽입됨니다. 이는 매우 짧게 설정합니다.

예를 들어, UITextField에 입력할 때 응답을 서버에 보내고 싶다고 하면, 사용자가 매우 빠른 속도로 입력하는 경우 서버 요청 횟수가 비정상적으로 늘어날 수 있습니다. 이 때 .debounce 연산자를 사용해서 사용자가 입력하고 일시 중지해야 네트워크에 요청을 할 수 있게 만들 수 있습니다.

매개변수는 .delay와 같습니다. (tolerance 오차 매개변수는 없슴니다)

Timeout

.timeout(Publishers.Timeout).debounce와 반대되는 개념으로, 시간간격을 지정하면 오퍼레이터는 값을 매번 아래로 전달합니다. 만약 간격내에 값이 도착하지 않고 .finished가 수신되지 않으면 파이프라인을 종료합니다.

매개변수는 다음과 같습니다.

  • _: 업스트림에서 값이 도착한 후 다른 값이 들어오기 전까지 경과하지 않아야 하는 시간 간격입니다.
  • customError: 에러를 생성하는 함수. 옵셔널이며 생략하면 오퍼레이터는 .finished를 전송하여 타임아웃에 응답함니다.

오류를 생성하려면 업스트림 파이프라인과 동일한 유형이어야 합니다. 업스트림 파이프라인의 실패 유형이 Never인 경우 오류 함수를 제공할 수 없으므로 이를 해결하려면 .setFailureType을 사용해야합니다.

.setFailureType(to: Error.self)
.timeout(0.1, scheduler:DispatchQueue.main) { MyError.oops }

Throttle

.throttle (Publishers.Throttle).buffer의 단순한 버전과 같슴니다. 또한 .debounce와도 밀접한 관련이 있습니다. 오퍼레이터는 타이머를 실행하고 수신 값의 버퍼를 유지합니다. 시간 간격이 끝나면 오퍼레이터는 버퍼에서 값을 선택하고 아래로 전달 후 버퍼를 비웁니다. 따라서 시간 간격당 두 개 이상의 값이 도착하는 것을 방지하는 방식입니다.

처음 값은 바로 전달합니당

  • for: 시간 간격입니다.
  • latest: 전달할 값이 버퍼에서 선택되는 방식을 결정합니다. 참이면 가장 최근의 값, 거짓이면 가장 오래된 값입니다.
let pub = Timer.publish(every: 1, on: .main, in: .common).autoconnect()
    .scan(0) {i,_ in i+1}
    .throttle(for: 2, scheduler: DispatchQueue.main, latest: true)

// OUTPUT
1 3 5 7 9 ...

MeasureInterval

.measureInterval (Publsihers.MeasureInterval) 은 현재 수신된 값과 이전에 수신된 값 사이에 경과된 시간을 보고합니다. 보고되는 첫 번째 시간 간격은 구독과 첫 번째 값의 수신 사이 시간입니다.

  • using: 시간을 측정할 대기열 또는 런루프입니다.

시간 간격은 Stride 구조체로 보고되며, 실제 시간은 스케줄러에 따라 크기가 달라지는 구조체의 크기 속성임니다. DispatchQueue의 경우 나노초를 보고합니다. 만약 시간간격을 초 단위로 얻으려면 다음과 같이 하면 됩니다.

.measureInterval(using: DispatchQueue.main)
.map { Double($0.magnitude) }

다른 오퍼레이션 큐나 런루프는 초단위로 보고하므로 .map이 필요하지 않습니다.

Buffer는 저번에 작성했으므로 생략하겠슴니당
다운스트림의 backpressure에 대한 대응을 하는 친구라고 알아두시면 좋을듯 함니다. 값이 삼켜지는걸 막아주는역할도함니다.

profile
hi there 👋

0개의 댓글