[Combine] Operators - 06

rbw·2023년 12월 27일
0

Combine

목록 보기
10/11

Wrappers

래퍼란 연산자나 파이프라인을 타입이 지워진 형태로 래핑하는 연산자를 의미합니다. .eraseToAnyPublisher()는 연산자를 체인으로 연결할 때 발생하는 일반적인 복잡성을 단일 유형인 AnyPublisher로 축소합니다. 파이프라인의 마지막에 주로 사용하여 작업을 간단하게 만들어줌니다.

.makeConnectable(Publishers.MakeConnectable)은 파이프라인을 ConnectablePublisher 구조체로 래핑함니다. 이렇게 하면 파이프라인을 타이머 처럼 작동하게 만들어서 사용자가 connect, autoconnect를 하기 전까지 시작되지 않슴니다. 예시코드임니다.

let pub = URLSession.shared.dataTaskPublisher(for: url)
    .receive(on: DispatchQueue.main)
    .map {$0.data}
    .replaceError(with: Data())
    .compactMap { UIImage(data:$0) }
    .eraseToAnyPublisher()
    .makeConnectable()

pub.sink {print($0)}
    .store(in:&self.storage)

pub.connect()
    .store(in: &self.storage)

.sink 자체로는 아무 일도 일어나지 않슴니다. pub.connect()를 해야만 값을 발행함니다. 또 connect 하는 객체를 저장해두어 취소할 때 얘를 취소하면 편리하게 사용이 가능합니다.

Debuggers

디버거란 파이프라인을 디버깅할 때 유용한 연산자들을 의미함니다. 주로 사용하는 것은 .print로 파이프라인에서 위 아래로 전달되는 모든 메시지를 콘솔에 보여줌니다.

Print

.print(Publishers.Print)는 업스트림으로 전달되는 구독, 요청 또는 취소 메시지나 다운스트림으로 전달되는 값 또는 완료메시지 등 어느 방향에서든 통과하는 모든 것을 콘솔에 보여줍니다. 예를들어,

Just(1)
	.print()
// OUTPUT
receive subscription: (Just)
request unlimited
receive value: (1)
receive finished

또 아래처럼 문자열을 추가하여 알아보기 쉽게도 가능함니다.

[1].publisher
    .print("before")
    .flatMap(maxPublishers: .max(1)) {
        Just($0)
        .print("inside")
    }
    .print("after")
    .sink(receiveCompletion: {print("DONE:", $0)})
        {print("VAL:",$0)}
        .store(in:&self.storage)
// OUTPUT
after: receive subscription: (FlatMap)
after: request unlimited
before: receive subscription: ([1])
before: request max: (1)
before: receive value: (1)
inside: receive subscription: (Just)
inside: request unlimited
inside: receive value: (1)
after: receive value: (1)
VAL: 1
inside: receive finished
before: request max: (1)
before: receive finished
after: receive finished
DONE: finished

HandleEvents

.handleEvents(Publishers.HandlerEvents는 기본적으로 .print의 일반화 버전임니다. .print는 오직 콘솔에 띄워주는 역할을 수행함니다.

하지만 이 친구는 사용자가 원하는 모든 작업을 수행하며, 사용자가 지정한 함수로만 구성됩니다. 그리고 동작들은 함수를 통과한 내용에 따라 분리됨니다. 5개의 함수를 사용하며, 모두 옵셔널임니다.

  • receiveSubscription: 구독을 받습니다
  • receiveOutput: 업스트림에서 값을 받슴니다
  • receiveCompletion: 완료(.failure or .finished)를 받슴니다
  • receiveCancel: 취소를 받슴니다
  • receiveRequest: 요청을 받슴니다

얘네들을 사용하여 역압력의 동작을 추적하기 위해 사용도 가능하며, 다운스트림 실패 후 취소 메시지의 흐름을 볼 수 있도록 취소메시지의 출력도 가능함니다.

이 연산자를 디버거 연산자로 분류했지만 .sink와 같은 중간 파이프라인으로 사용할 수도 있습니다.

Breakpoint

.breakpoint(Publishers.Breakpoint)는 파이프라인을 중단점에서 일시중지하게 해줌니다. 보통 파이프라인에서 브레이크를 거는 곳은 없지만 이 연산자를 사용하면 파이프라인 체인 내의 연산자 자체에서 일시 중지할 수 있슴니다

  • receiveSubscription: 구독을 박슴니다
  • receiveOutput: 값을 받슴니다.
  • receiveCompletion: Completion을 받슴니다.

위 함수들은 Bool을 반환함니다. true를 반환하면 Xcode가 일시 중지됩니다.

하지만 위 함수를 사용하면 Xcode의 디버거에서 어셈블리어가 표시되는 화면을 띄워줌니다.. 그렇지만 .print 와 같이 중지하는 식으로 사용하면 단계별로 추적할 수 있슴니다..!

Composed Operators

때로는 직접 연산자를 만들어야 할 수도 있습니다. 이 경우 기존 연산자들을 결합하여 새로운 연산자를 만드는 것이 좋슴니다. 작성자분은 이런 종류의 연산자를 Composed OP라고 한다네요

retry, delay를 결합하여 새 연산자를 만들어보겠슴니다.

결합하여 사용하는 연산자를 자주 사용한다면, 퍼블리셔를 시작점으로 하여 마음대로 사용하고 싶습니다. 따라서 파이프라인을 생성하는 함수로 만들어보겠슴니다. 이 함수는 퍼블리셔를 매개변수로 받아 해당 체인의 첫 번째 오퍼레이터의 업스트림으로 사용할 것임니다. 반환 값은 AnyPublisher로 해야겠져

func applyDelayAndRetry<Upstream:Publisher>(upstream:Upstream) 
    -> AnyPublisher<Upstream.Output, Upstream.Failure> {
        let share = Publishers.Share(upstream: upstream)
        return share
            .catch { _ in 
                share.delay(for: 3, scheduler: DispatchQueue.main) 
            }.retry(3)
            .eraseToAnyPublisher()
}

// 사용예시
let pub = URLSession.shared.dataTaskPublisher(for: url)
let head = applyDelayAndRetry(upstream: pub)

좀 더 유연하게 만들기 위해서 시간과 재시도 횟수를 인자로 정해주면 더 좋겠져 그리고 Publisher 확장에 추가하여 파이프라인 내부에서도 사용하고 싶습니다. 아래는 전체코드임니다.

extension Publisher {

    func delayAndRetry<S:Scheduler>(
        for interval:S.SchedulerTimeType.Stride,
        scheduler:S,
        count:Int
    ) -> AnyPublisher<Self.Output, Self.Failure> {
    
        func applyDelayAndRetry<Upstream:Publisher, S:Scheduler>(
                upstream:Upstream,
                for interval:S.SchedulerTimeType.Stride,
                scheduler:S,
                count:Int
            )
            -> AnyPublisher<Upstream.Output, Upstream.Failure> {
                let share = Publishers.Share(upstream: upstream)
                return share
                    .catch { _ in 
                        share.delay(for:interval, scheduler:scheduler) 
                    }.retry(count)
                    .eraseToAnyPublisher()
        }
        // 호출하는 self를 넣어주면 됨니다.
        return applyDelayAndRetry(upstream:self, 
            for:interval, scheduler:scheduler, count:count)
    }
    
}

// 사용예시
URLSession.shared.dataTaskPublisher(for: url)
    .delayAndRetry(for: 3, scheduler: DispatchQueue.main, count: 3)
    .receive(on: DispatchQueue.main)
    // ...
profile
hi there 👋

0개의 댓글