Accumulators

이번엔 누적 연산자들에 대해서 알아보겠습니다.

누적 연산자란, 업스트림에서 값을 통과시키지 않고 어떤 일이 일어날 때까지 기다리면서 누적하는 연산자를 의미합니다. 이러한 연산자들은 값이 도착하면 버퍼에 저장하거나 도착하는 값 사이의 중간 값을 유지하는 등 어떤 방식으로든 값을 누적합니다. 수신된 값을 하나의 값으로 결합하여 업스트림이 완료되면 이를 방출하는 .reduce가 가장 뛰어난 누산기임니다.

대부분의 경우, 얘네가 기다리는 무언가 는 업스트림의 값 스트림이 끝나는 시점입니다. 이는 .finished 컴플리션을 받으면 알 수 있습니다. 그런 다음 누적 연산자는 값을 내보내고 자체적으로 .finished를 보냅니다

보통 .finished를 기다리는 누산기는 업스트림에서 유한 수열을 생성하고 완료를 수신하지 않는 한 값을 전혀 방출 못함니다. 하지만 일부 누산기는 이 프로세스를 단락시킬 수 있습니다. 즉, 업스트림에서 답을 알려주는 일부 값이 도착하고 답을 일찍 알기 때문에 누산기는 업스트림을 취소하고 해당 값을 방출합니다.

만약 업스트림에서 오류가 발생하면 누산기는 이를 다운스트림으로 전달하김나 하면 됩니다. 즉, 연산자 자체가 자체적으로 값을 내보내지 않을 수도 있습니다.

예시로, 값 3개만 방출하고 실패하는 퍼블리셔가 있습니다.

[1,2,3,-1,4,5,6].publisher
    .tryMap { (i:Int) -> Int in
        if i < 0 { throw MyError.oops }
        return i
    }

// 만약 .count를 파이프라인에 추가하면 동작할까요? ㄴㄴ 안함니다
// 해결방법으로 누산기에 도달하기전에 failure를 finished로 바꾸는 것임니다.
// catch, empty를 사용하면 됨니다.
[1,2,3,-1,4,5,6].publisher
    .tryMap { (i:Int) -> Int in
        if i < 0 { throw MyError.oops }
        return i
    }
    .catch { _ in Empty<Int,MyError>(completeImmediately: true) }
    .count()

Reduce

.reduce(Publishers.Reduce) 이 친구는 배얼의 reduce 함수라고 보시믄 댐니다. 처음 출력 값과, 함수 두 가지 매개변수를 제공합니다. 출력 값은 누적되는 값입니다 함수 자체는 누적된 출력 값과 업스트림에서 새로 도착한 값의 두 가지 매개변수를 받으며, 이 값은 누적된 값인 새 출력값을 반환하고 다음 값이 업스트림에서 도착하면 다음 함수 호출의 첫 번째 매개변수로 제공됩니다.

.finished 가 수신될 떄까지 계속됩니다. 이 시점에서 함수에 대한 마지막 호출에서 반환된 출력 값이 다운스트림으로 출력 됨니다.

[1,2,3].publisher
    .reduce(0) {
        return $0 + $1
    }

$0 은 누적값, $1은 업스트림에서 새로 도착한 값입니다. 1,3,6을 반환하고 업스트림에서 .finished가 도착하므로 연산자는 6을 다운스트림으로 보내고 .finished를 보냅니다.

Max, Min, Count

얘네들은 말그대로의 기능들을 하는 친구들이라 생략해도될거같슴니다..!

.count(Publishers.Count)는 아래와 똑같습니다.

.reduce(0) { prev, _ in $0+$1 }

업스트림에서 값을 내보내지않고 .finished를 보내면 .count는 0을 반환합니다.

Contains, AllSatisfy

.contains(Publsihers.Contains)는 매개변수로 제공한 값이 업스트림에서 도착하는 값 중 존재하는지 여부를 Bool 값으로 반환함니다.

또 만족하는 값이 도착하면 이 연산자는 즉시 답이 참이라는 것을 알고 업스트림을 취소하고 참을 반환한 다음 .finished를 보냅니다. 그러나 응답이 거짓이라는 것은 모든 값을 받고, .finished가 수신될 때까지 기다리는 것입니다.

[1,2,3].publisher
    .contains(2) // true
[1,2,3].publisher
    .contains { $0 > 1 } // true
[1,2,3].publisher
    .contains { $0 < 1 } // false, after completion

.tryContains(where:)(Publishers.TryContainsWhere)는 오류를 던질 수 있다는 점을 제외하면 위의 메소드와 같지만, 이 경우 업스트림을 취소하고 오류가 실패로 파이프라인에 전송됩니다.

.allSatisfy(Pubishers.AllSatisfy)업스트림에서 도착하는 모든 값이 특정 테스트를 통과한지 여부Bool로 나타냄니다.

얘도 거짓을 반환하면 바로 업스트림을 취소하고 .finished 후에 false를 내보냅니다. 그러나 true를 반환하면 이 연산자는 다음 값을 기다리고, .finished가 수신된 경우에만 최종적으로 true를 내보냅니다.

[1,2,3].publisher
    .allSatisfy { $0 % 2 == 0 } // false
[1,2,3].publisher
    .allSatisfy { $0 > 0 } // true, after completion

Collect

.collect(Publishers.Collect) 는 업스트림애서 받은 모든 값을 버퍼에 수집한다음 .finished를 받으면 해당값을 모두 배열로 구성된 단일 값으로 내보낸 다음 자체적으로 .finished 를 내보냅니다

네트워크 요청의 결과 값의 배열로 만드는 예제입니다.

[
    "https://www.apeth.com/pep/manny.jpg",
    "https://www.apethh.com/pep/moe.jpg",
    "https://www.apeth.com/pep/jack.jpg",
]
.map(URL.init(string:)).compactMap{$0}
    .map{ URLSession.shared.dataTaskPublisher(for:$0) }
    .publisher
    .flatMap(maxPublishers:.max(1)) {
        $0.replaceError(with: (data: Data(), response: URLResponse()))
    }
    .collect()

위 코드의 결과는 URL 배열 순서가 보장되는 (data: Data, response: URLResponse) 의 배열입니다. 만약 .maxPublishers:max(1)이 없다면 순서는 보장할 수 없습니다. 비동기작업을 직렬화해야 순서를 보장할 수 있습니다.

.collect(_:) (Publishers.CollectByCount)는 단순히 .finishded를 기다리는 것이 아니라, 지금까지 업스트림에서 수신한 값의 배열을 일정 수 이상의 값이 도착하면 전송하고 수신 값의 버퍼를 지웁니다. 따라서 업스트림에서 받은 값 스트림을 고정된 크기의 청크로 나눕니다.

[1,2,3].publisher
    .collect(2) // [1,2] then [3], then finished~

.collect(_:options:) (Publishers.CollectionByTime) 얘는 버퍼를 고정된 시간 간격으로 내보냅니다.

  • byTime: 연관 값은 시간을 계산하는 방법을 나타내느 스케줄러와, 간격입니다.
  • byTimeOrCount: 위와 비슷하지만 카운트를 나타내는 추가 매개변수가 있으며, 시간이 경과하거나 카운트를 만족하거나 하는 시점에 연산자가 실행됩니다.
Timer.publish(every: 0.3, on: .main, in: .common).autoconnect()
    .scan(0) {i,_ in i + 1}
    .collect(.byTime(DispatchQueue.main, 1))

// output
[1,2,3]
[4,5,6]
[7,8,9,10]
if -> .collect(.byTimeOrCount(DispatchQueue.main, 1, 3)) // count를 추가하면 최대 3개씩 내려온다

options:는 생략 가능합니다.

Partitioners

이번엔 파티셔너에 대해 알아보겠씀니다. 얘는 업스트림 값의 일부를 분리하여 해당 값을 출력하는 연산자를 의미합니다. 누산기와 마찬가지로 일부 파티셔너는 .finished를 받아야만 값을 내보낼 수 있습니다. .last 연산자의 경우가 예시가 될 수 있슴니다.

First and Last

.first(Publishers.Frist) 는 업스트림에서 받은 첫 번째 값을 내보냅니다 내보내기 직전에 퍼블리셔를 취소하고, 내보낸 후에 .finished를 보냄니다

.first(where:) (Publsihers.FirstWhere) 얘는 함수가 일부 업스트림 값에 대해 참을 반환하는 즉시 연산자는 업스트림 퍼블리셔를 취소하고 값을 내보낸 후에 .finished를 보냄니다

.last도 거의 동일하니가 생략하겠슴니당

Output

.output(at:) (Publishers.Output) 얘는 인덱스를 나타내는 Int값을 받습니다. 해당 인덱스가 지정한 값이 도착하면 해당 값을 내보내고 업스트림 퍼블리셔를 취소한 다음 .finished를 보냄니다. 만약 인덱스에 도달하기 전에 업스트림에서 .finished를 방출하는 경우 얘는 값을 전송하지 않고 다운스트림에 .finished만 보냅니다.

.output(in:) (Publishers.Output) 얘는 매개변수가 범위라는 점을 제외하면 위와 비슷함니다. 범위의 숫자로 인덱싱된 각 값을 출력하고, 범위로 인덱싱된 마지막 값을 출력 후 업스트림 퍼블리셔를 취소하고 .finished를 보냄니다.

let numbers = [1, 1, 2, 2, 2, 3, 4, 5, 6]
numbers.publisher
    .output(in: (3...5))
    .sink { print("\($0)", terminator: " ") }
// output
" 2  2  3 "

Prefix and Drop

.prefix(_:) 얘는 실제로 .output(in:)의 한 형태라고 생각하면 됩니다. 받은 숫자를 0부터 시작하는 범위로 바꿔서 output(in:0..<n) 하는검니다.

.prefix(while:) (Publishers.PrefixWhile) 얘는 업스트림에서 값을 수신 후 Bool 을 반환함니다. 이 Bool 값이 true면 값을 아래로 전달함니다. 거짓히면, 업스트림을 취소하고 .finished를 보냄니다.

.dropFirst() (Publsihers.Drop) 얘는 .prefix(1) 의 역으로 처음 값을 삼킨 다음 나머지 받는 모든 값을 그냥 아래로 다 전달함니다.

.drop(while:) (Publishers.DropWhile)은 업스트림에서 값을 받고 참 거짓을 판별하는데 참을 반환하면 값을 삼킵니다. 거짓을 반환하면 받은 값을 아래로 전달하고 그 이후부터는 받는 값을 전부 다 아래로 보냄니다.

만약에 .suffix(_:)를 만들고 싶다면 아래처럼 하면 됨니다.

// 마지막 2개의 값만 받고 싶다고 할 때
[1,2,3,4].publsiher
    .collect()
    .flatMap { $0.suffix(2).publisher }
profile
hi there 👋

0개의 댓글

Powered by GraphCDN, the GraphQL CDN