Joiners
란 여러 파이프라인의 값 스트림을 하나의 스트림으로 결합하는 연산자를 의미합니다. 각 파이프라인은 값을 생성하는데, 이제 이러한 값을한데 모아 하나의 파이프라인을 형성하고자 합니다.
가장 좋은 방법은 .merge
라고 하네요. 얘는 간단함니다. 업스트림 파이프라인에서 값을 보내면 이 파이프라인으로 값을 전달하면 됨니다. 얘는 .flatMap
과 비슷함니다
[1,2].publisher
.flatMap { _ in
Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0) {i,_ in i+1}
}
// output
1, 1 (1초 기다리고) 2, 2, (1초 stop) 3,3...
위 코드는 타이머가 2개가 생성되기 때문에 이와 같은 아웃풋이 나옴을 알 수 있슴니다. 둘 다 동일 파이프라인에서 값을 생성합니다. 이게 merge
임니다.
.merge(with:)
이 친구는 퍼블리셔를 매개변수로 사용함니다. 두 퍼블리셔 모두 동일 출력 및 실패 유형을 가져야 합니다. 이제 사실상 두 개의 퍼블리셔가 있슴니다. 이러한 업스트림 퍼블리셔 중 하나가 값을 생성하면 얘는 해당 값을 아래로 전달함니다.
이 작업은 두 업스트림 퍼블리셔가 모두 .finished
를 보낼 때까지 계속됨니다. 하나가 .failure
를 보내면 다른 퍼블리셔를 취소하고 밑에 실패를 보냄니다.
실제로 이 친구는 두 가지 형태가 있슴니다. 두 퍼블리셔가 모두 같은 유형인 경우, 이는 Publishers.MergeMany
임니다. 서로 다른 유형인 경우, 얘는 Publishers.Merge
임니다. 보통 이 차이를 의식하는 경우는 많이 없습니다. 둘 다 .eraseToAnyPublsiher
로 유형삭제를 할 수 있슴니다.
편의를 위해 3개~8개 퍼블리셔를 연결하기 위한 연산자가 제공됨니다. 구문은 모두 동일하지만 실제 객체는 Publishers.Merge3, Publishers.Merge8
등 서로 다름니다.
아래는 예시로 5개를 병합한 코드임니다. dropFirst, reduce
를 사용해서 하나 뺀거에 계속 붙여가며 만들었슴니다.
func makeTimer() -> AnyPublisher<Int,Never> {
Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(Int.random(in:1...10)) {i,_ in i+1}
.eraseToAnyPublisher()
}
let arr = (1...5).map {_ in makeTimer()}
let merged = arr.dropFirst().reduce(into: arr[0].eraseToAnyPublisher()) {
$0 = $0.merge(with: $1).eraseToAnyPublisher()
}
.zip(Publishrs.Zip)
은 퍼블리셔를 매개변수로 사용함니다. 둘 다 동일 유형을 가져야하지만 출력 유형은 다를 수 있습니다. 두 퍼블리셔 중 하나가 값을 생성하면 얘는 해당 값을 버퍼(LIFO 스택)에 넣슴니다. 두 퍼블리셔 모두가 값을 생성하면, 즉 두 버퍼에 모두 값이 있을 때마다 이 연산자는 두 버퍼의 시작부분에서 가장 오래된 값을 꺼내서 튜플로 결합 후 튜플을 내보냄니다.
업스트림 퍼블리셔 중 하나가 .finished
를 보내고 해당 퍼블리셔의 버퍼가 비어 있으면 얘는 다른 퍼블리셔를 취소하고 .finished
를 아래로 보냄니다. 즉 어느 퍼블리셔의 마지막 값이 아래로 전송되면 파이프라인이 종료되며, 게시된 튜플의 최대 개수는 더 짧은 업스트림 퍼블리셔의 스트림 개수임니다.
업스트림 퍼블리셔 중 하나가 실패를 전송해도 다른 퍼블리셔 취소하고 아래로 완료를 전송함니다.
[1,2,3].publisher
.zip(
["a","b"].publisher
.flatMap(maxPublishers:.max(1)) {
Just($0).delay(for: 1, scheduler: DispatchQueue.main)
}
)
위 예시에는 두 퍼블리셔가 존재하고 .flatMap
을 사용하기에 1초를 기다렸다가 a를 방출하고 다시 1초를 기다렸다가 b를 방출합니다. 타이밍은 아래와 같습니다.
1st 2nd
===============
1
2
3
(finished)
[one second]
"a"
[one second]
"b"
(finished)
결과적으로 반환되는 것은 아래와 같슴니다.
(1, "a")
(2, "b")
두 퍼블리셔가 모두 게시할 때까지는 아무것도 게시할 수 업슴니다. 위에서 3은 짝을 이루지 못하므로 버려지고 연산자가 완료됩니다.
퍼블리셔와 함수라는 두 개의 매개변수를 받는 또 다른 형태의 .zip
이 있슴니다. 얘는 튜플을 생성할 준비가 되면 해당 튜플을 형성하여 다운스트림으로 보내는 대신 두 값을 매개변수로 함수에 전달하여 다른 값으로 변환할 수 있도록 허용합니다. 마치 .zip
뒤에 .map
을 붙이는것과 같슴니다. 바로 후행클로저 구문으로 사용이 가능함니다.
튜플을 다운스트림에 전달하고 싶지 않을 수도 있기에 이 방식은 꽤 유용합니다.
[1,2,3].publisher
.zip(
[100,200].publisher
.flatMap(maxPublishers:.max(1)) {
Just($0).delay(for: 1, scheduler: DispatchQueue.main)
}
) { $0 + $1 }
zip
도 merge
와 마찬가지로 여러개의 퍼블리셔를 압축할 수 있습니다만 쉽게는 불가합니다. 이는 퍼블리셔가 서로 다른 출력 유형을 가질 수 있기 때문임니다. 튜플에 새 값을 추가할 수는 없습니다. 따라서 여러번의 .zip
연산자를 수동으로 입력해줘야합니다.
[1,2,3].publisher
.zip([10,11,12].publisher, [20,21,22].publisher, [30,31,32].publisher)
.zip([40,41,42].publisher)
// output
((1, 10, 20, 30), 40)
((2, 11, 21, 31), 41)
하지만 위 출력 형식은 저희 예상과는 다릅니다. zip
에는 5개를 한꺼번에 묶어주는 친구는 업슴니다 ㅜㅡㅜ
튜플을 5개로 만들고 싶다면 위에서 마지막에 변형을 해주면 댐니다
[1,2,3].publisher
.zip([10,11,12].publisher, [20,21,22].publisher, [30,31,32].publisher)
.zip([40,41,42].publisher) { ($0.0, $0.1, $0.2, $0.3, $1) }
출력 유형이 동일한 경우에 이렇게 사용이 가능합니다. 배열로 또 이루어져 있어서 다른 방법도 있습니다.
var pubs = [
[1,2,3].publisher,
[10,11,12].publisher,
[20,21,22].publisher,
[30,31,32].publisher,
[40,41,42].publisher
]
// 4개의 배열을 reduce를 사용해서 zip을 원소마다 적용하고
// 반환 값으로는 배열을 리턴한다면 계속 append 하는 효과와 동일함니다.
pubs.dropFirst().reduce(into: AnyPublisher(pubs[0].map{[$0]})) {
res, pub in
res = res.zip(pub) {
i1, i2 -> [Int] in
return i1 + [i2]
}.eraseToAnyPublisher()
}
// output
[1, 10, 20, 30, 40]
[2, 11, 21, 31, 41]
.combineLatest (Publishers.CombineLatest)
얘는 zip
을 거꾸로 뒤집은 것과 같슴니다.
이전의 zip 예제를 그대로 들고오겠슴니다.
// zip
(1,a)
(2,b)
[1,2,3].publisher
.combineLatest(
["a","b"].publisher
.flatMap(maxPublishers:.max(1)) {
Just($0).delay(for: 1, scheduler: DispatchQueue.main)
}
)
// combineLatest output
(3, a)
(3, b)
이 친구가 기억하는 것은 각 퍼블리셔에서 가장 최근에 도착한 값 뿐입니다. 따라서 3은 진작에 발행되었고, 다른 퍼블리셔에서 방출되는 값이랑 계속 같이 방출된다고 보면 됨니다. 또한 3을 게시하고 뒤에 후속 값이 도착하지 않기 때문에 3을 계속 기억하고 있습니다.
두 퍼블리셔가 모두 .finished
를 보내면 자체적으로 .finished
를 보냅니다. 하나의 퍼블리셔에서 실패를 보내면 나머지 게시자도 취소하고 실패를 아래로 전달합니다.
zip
과 마찬가지로 값의 변형도 가능합니다.
이 연산자가 RxSwift에는 있지만 Combine에는 없다고 하네요. 이 연산자는 두 퍼블리셔중 첫 번째 퍼블리셔가 게시할 때만 발행하는 차이가 있습니다. (combineLatest 는 두 퍼블리셔 중 하나가 게시할 때 게시) 얘를 한 번 만들어보겠씀니다.
첫 퍼블리셔가 게시할 때 해당 값을 타임스탬프가 포함된 튜플로 감싸면 됩니다. 그런 다음 두 번째 퍼블리셔와 .combineLatest
를 수행 후, 첫 퍼블리셔가 새 값을 게시했음을 나타내는 타임스탬프가 변경되지 않는 한 .removeDuplicates
를 사용하여 출력을 차단합니다.
pub1.map {value in (value:value, date:Date()) }
.combineLatest(pub2)
.removeDuplicates { $0.0.date == $1.0.date }
.map { ($0.value, $1) }
Rx의 WithLatestFrom을 살펴보니까 무적건 첫 번째 퍼블리셔가 방출을 하고 그때 두 번째 퍼블리셔도 방출한 값이 있어야만 다운스트림에 전송하던데 위 코드는 맨 처음의 값은 무조건 방출 성공하는거같아서 조..금 다른듯함
좀 더 정확하게 하려면 두 번째 퍼블리셔의 방출 값에서도 Date()를 받아서 타임스탬프로 비교하면 얼추 될거가틈
.append (Publishers.Concatensate)
는 적용되는 퍼블리셔의 값을 도착하는대로 생성한 다음, 매개변수의 퍼블리셔 값이 도착하는 대로 생성합니다.
이러한 값이 순차적으로 도착하도록 하는 정책을 적용하기 위해 이 연산자는 첫 번째 퍼블리셔의 .finished
를 수신할 때까지 두 번째 퍼블리셔를 구독하지 않슴니다. (반대로 첫 번째 퍼블리셔로부터 .finished
를 받지 못하면 첫 퍼블리셔의 값을 계속 게시하고 두 번째 퍼블리셔를 구독하지 않슴니다)
두 번째 퍼블리셔가 .finished
를 보내면 이 연산자도 완료를 보냄니다. 하나의 게시자에서 실패를 보내면 얘는 다른 게시자도 취소하고 실패를 전달함니다.
잘 모르겠어서 코드를 보겠슴니다..!
[1,2,3].publisher.flatMap(maxPublishers: .max(1)) {
Just($0).delay(for: 1, scheduler: DispatchQueue.main)
}.append(Just(100))
// OUTPUT
(1초 stop) 1 (1초 stop) 2 (1초 stop) 3 100
얘는 첫 퍼블리셔의 게시가 끝나면 뒤에 붙는식으로 동작함니다. 지금은 Just(100)
으로 하나만 넣었는데 여러개의 퍼블리셔를 넣으면 그만큼 뒤에 추가됨니다.
.prepend (Publishers.Concatenate)
도 위에랑 완전 동일하며 실제로 내부적으로 동일한 연산자이지만 퍼블리셔의 의미가 반전되어 있다는 점이 다릅니다.
만약 위 코드에 append
자리에 prepend
로 바꾼다면 100이 먼저 출력된 이후에 1, 2, 3 이 차례대로 출력될 것입니다.
이러한 첫 번째 퍼블리셔가 완료될 때까지 두 번째 퍼블리셔가 시작되지 않는 동작 덕분에 append, prepend
를 사용하면 비동기적인 코드의 직렬화를 이뤄낼수 있음을 알 수 있습니다. .flatMap
보다 간단합니다 ! 하지만 조금의 대가가 따릅니다. .flatMap
의 경우 첫 비동기 연산에서 생성된 각 값을 두 번째 비동기 연산으로 흘려보낼 수 있지만, .append
를 사용하면 모든 값이 파이프라인의 맨 아래에 독립적으로 흘러나오게 됩니다. 여기서 직렬화는 전체 스트림임니다.
출력스트림을 직렬화하려는 퍼블리셔가 여러 개 있다면 append
를 체인으로 연결하여 구성할 수 있습니다.
.drop(untilOutputFrom:) (Publishers.DropUntilOutput)
은 정확히 조인러는 아니지만 두 퍼블리셔의 입력을 포함하므로 파티셔너인 .drop
보다는 합치는 친구에 더 가깝슴니다.
얘가 하는 일은 다른 퍼블리셔가 값을 내보낼 때까지 업스트림의 모든 값이 억제되도록 하는 것임니다. 해당 값은 무시되고 다른 퍼블리셔에 대한 구독이 삭제되며 업스트림의 값은 이제 파이프라인을 따라 흐르도록 허용됨니다.
기본적으로 이것은 일종의 스위치(or 게이트)로서, 보조 퍼블리셔가 스위치를 눌러 값이 흐르도록 허용할 수 있는 권한으르 가짐니다. 그런 의미에서 두 퍼블리셔가 모두 게시하기전에 전달하지 않는 .zip, combineLatest
과 유사함니다.
두 번째 퍼블리셔의 출력 유형은 중요하지 않슴니다. (무시된다고함니다) 얘는 신호의 역할이기 떄문임니다.
두 번째 퍼블리셔가 값을 내보내지 않고 완료를 내보내면 해당 신호가 파이프라인을 따라 전파되고 첫 번째 퍼블리셔에서 다운스트림으로 내려온 값 없이 전체 파이프라인이 종료됨니다. 따라서 두 번째 퍼블리셔는 전체 파이프라인이 진행되기 전에 전체 파이프라인을 취소할 수 있는 추가 권한이 있슴니다. 값을 보내야만 첫 번째 퍼블리셔에서 게시된 값을 방출함니다.
let upstream = PassthroughSubject<Int,Never>()
let second = PassthroughSubject<String,Never>()
cancellable = upstream
.drop(untilOutputFrom: second)
.sink { print("\($0)", terminator: " ") }
upstream.send(1)
upstream.send(2)
second.send("A") // 이 때부터 받는 값이 출력댐
upstream.send(3)
upstream.send(4)
// OUTPUT
3 4
.prefix(untilOutputFrom:) (Publishers.PrefixUntilOutput)
얘도 위랑 비슷함니다. 이 친구가 하는 일은 두 번째 퍼블리셔가 값을 내보낼 때까지 모든 값을 다 통과시킵니다.
두번째 퍼블리셔애서 값이 도착하면 업스트림 퍼블리셔가 취소되는 간단한 방식으로 동작합니다. 그러나 값이아니면 계속 게시합니다
위와 상호 보완연산자임니다.