요즘 Combine을 조금씩 공부해보고 있다. 커스텀 연산자도 만들어보면서..
근데 이 과정에서 신기한 것이 있었다. 바로 subscribe(on:)
연산자.
아래는 나와 회사 팀원들이 같이 고민해보고 찾아본 결과를 토대로 작성한 글이다.
나는 Combine의 subscribe(on:)
이 스트림에 대한 기본 쓰레드 지정용으로 쓰는 줄 알았다.
때문에 이 operator를 쓰면 - 일반적인 publisher에 쓰든 subject같은 값 주입이 가능한 publisher에 쓰든 항상 sink 클로저는 해당 쓰레드에서 돌아가게 되는 줄 알았다.(receive(on:)
을 안썼다면)
그런데 그게 아니었다.
subscribe(on:)
참고를 위해 RxSwift subscribeOn
마블 그림을 가져와보았다.
specify the Scheduler on which an Observable will operate
➡️ ReactiveXsubscribeOn
설명 중.. 링크
위의 그림을 보면 알 수 있듯 RxSwift subscribeOn
은 Observable 동작이 시작될 쓰레드를 지정하는데에 쓰인다. (선언 위치는 상관이 없다.)
이 그림은 Combine의 subscribe(on:)
에도 비슷하게 적용된다.
그런데 이 형태가 subject
종류의 스트림에도 동일하게 적용될 수 있을까?
//
// ContentView.swift
// CombineTest
//
// Created by JINHONG AN on 2023/03/26.
//
import SwiftUI
import Combine
struct ContentView: View {
private let numberGenerator = NumberGenerator()
var body: some View {
VStack {
Button {
numberGenerator.generateRandomNumber()
} label: {
Text("랜덤 숫자 생성하기")
}
}
.padding()
}
}
final class NumberGenerator {
private let numberSubject = PassthroughSubject<Int, Never>()
private var cancellables = Set<AnyCancellable>()
init() {
numberSubject
.subscribe(on: DispatchQueue.global())
.sink { number in
print("🎉 \(number)숫자 출력되었습니다!! - 쓰레드 정보: \(Thread.current)")
}
.store(in: &cancellables)
}
func generateRandomNumber() {
numberSubject.send(Int.random(in: 0...100))
}
}
struct ContentView_Previews: PreviewProvider {
static var previews: some View {
ContentView()
}
}
위와 같이 코드를 짰고 화면의 버튼을 누르면 숫자 및 동작 쓰레드가 콘솔 로그에 찍히도록 만들었다.
그리고 나는 항상 백그라운드 쓰레드에서 결과값이 찍힐 것이라고 예상했다.
왜냐하면
1. generateRandomNumber
함수 호출이 View의 body에서 이루어지기 때문에 MainActor에서 호출됨
2. send가 MainActor에서 이루어짐
3. 하지만 subscribe(on:)
에서 DispatchQueue.global()
를 걸어줬기 때문에 값 방출은 백그라운드 쓰레드에서 동작함(위에서 본 RxSwift 그림처럼)
4. sink 내부에서 쓰레드 출력은 백그라운드 쓰레드로 찍힘
위와같이 생각했기 때문이다.
하지만 결과는 달랐다.
아니 모두 main thread로 찍힌다고?!!?? 😯
이러한 결과값은 나를 혼돈에 빠뜨렸다. (물론 팀원들도 멘붕에 빠졌었다.)
왜 이런 결과가 나온 걸까?
정답을 찾아나가기 전에 우리는 subscribe 과정이 어떻게 일어나는지 알아볼 필요가 있다.
아래는 subscribe가 일어나는 과정을 내 나름대로(😅) 그려본 것이다.
직접 그린 기존 그림(참고용)subscribe 과정을 직접 구현하는 경우 구현 형태는 충분히 달라질 수 있다. 따라서 아래의 그림이 항상 맞는 것은 아닐 수 있다.
subscribe
한다. (publisher의 subscribe(_:)
를 호출하거나 sink
를 하거나)receive(subscription:)
호출) request(_:)
를 호출하여 subscription에게 원하는 element개수를 전달한다.receive(_:)
를 통해 재전달한다. (이 때 하나 전달했으니 demand는 1 차감한다)참조 관계는 일반적으로 아래와 같이 구성된다.
직접 그린 기존 그림(참고용)이렇게 subscribe의 과정을 조금 자세히 살펴보았다.
이제 다시 아래의 결과가 왜 나왔는지 찾아가보도록 하자.
subscribe(on:)
operator 공식문서정답은 의외로 항상 가장 가까운 곳에 있다. subscribe(on:)
- Apple 공식문서를 봐보자.
subscribe, cancel, request 동작을 실행시킬 스케쥴러를 지정한다.
downstream 메시지에 영향을 미치는
receive(on:options:)
와는 반대로subscribe(on:options:)
는 upstream 메시지의 실행 context를 변경한다.
subscribe, cancel, request 동작을 실행시킬 스케쥴러를 지정한다고 나와있다.
즉, subscribe(on:)
으로 지정하게 되는 것은 publisher의 subscribe(_:)
, subscription의 request(_:)
와 cancel()
을 실행시킬 스케쥴러뿐이다.
//
// ContentView.swift
// CombineTest
//
// Created by JINHONG AN on 2023/03/26.
//
import SwiftUI
import Combine
struct ContentView: View {
private let numberGenerator = NumberGenerator()
var body: some View {
VStack(spacing: 50) {
Button {
numberGenerator.generateRandomNumber()
} label: {
Text("랜덤 숫자 생성하기")
}
Button {
numberGenerator.generateCompletion()
} label: {
Text("completion 시키기")
}
}
.padding()
}
}
final class NumberGenerator {
private let numberSubject = PassthroughSubject<Int, Never>()
private let numberSubscriber: AnySubscriber<Int, Never>
init() {
numberSubscriber = .init(receiveSubscription: { subscription in
print("subscription을 받았습니다. - 💻쓰레드 정보: \(Thread.current)")
subscription.request(.unlimited)
}, receiveValue: { number in
print("\(number)숫자 받았습니다!! - 💻쓰레드 정보: \(Thread.current)")
return .none
}, receiveCompletion: { completion in
print("\(completion) 받았습니다!! - 💻쓰레드 정보: \(Thread.current)")
})
numberSubject
.handleEvents(receiveSubscription: { _ in
print("receive subscription - 💻쓰레드 정보: \(Thread.current)")
}, receiveOutput: { _ in
print("receive output - 💻쓰레드 정보: \(Thread.current)")
}, receiveCompletion: { _ in
print("receive completion - 💻쓰레드 정보: \(Thread.current)")
}, receiveCancel: {
print("receive cancel - 💻쓰레드 정보: \(Thread.current)")
}, receiveRequest: { _ in
print("receive request - 💻쓰레드 정보: \(Thread.current)")
})
.subscribe(on: DispatchQueue.global())
.subscribe(numberSubscriber)
}
func generateRandomNumber() {
print("\(#function) 호출되었습니다. - 💻쓰레드 정보: \(Thread.current)")
numberSubject.send(Int.random(in: 0...100))
}
func generateCompletion() {
print("\(#function) 호출되었습니다. - 💻쓰레드 정보: \(Thread.current)")
numberSubject.send(completion: .finished)
}
}
struct ContentView_Previews: PreviewProvider {
static var previews: some View {
ContentView()
}
}
이번에는 위와 같이 AnySubscriber를 만들어서 Subscribe하는 과정을 볼 수 있도록 하였다.
그리고 handleEvents
operator를 사용하여 구독 과정에서는 어떠한 일들이 발생하는지 알 수 있도록 하였다.
handleEvents
는 뭐랄까 중간 감시자같은 느낌으로 동작한다고 보면 쉬울 것 같다.
뭐가 어떻게 오고가는지 보는 출입국 심사 - 보안검사대 같은 느낌?
앱 실행 ->
랜덤 숫자 생성
2번 탭 ->completion 시키기
탭 한 결과는 아래와 같았다.
위의 결과 스크린샷에서도 볼 수 있듯이 subscription이 subscriber에게 전달되는 일련의 과정과 subscriber가 해당 subscripton을 이용하여 request demand를 하는 과정은 백그라운드 쓰레드에서 일어남을 알 수 있다.
이외에 - 값과 completion을 보내는 일련의 과정들은 main thread에서 일어남을 볼 수 있다.
값과 comletion 처리과정이 main thread에서 일어난 이유는 '로직의 시작이 View의 body를 담당하는
MainActor
에서 시작되었기 때문'이라고 볼 수 있다. ➡️ send가 main thread에서 일어남
cancel()
을 외부에서 직접 호출 할 시 호출 쓰레드가 영향을 주므로 ➡️ 자동으로 cancel()이 호출되도록 해야한다. 따라서 테스트를 별도로 만들어 진행하였다.
//
// ContentView.swift
// CombineTest
//
// Created by JINHONG AN on 2023/03/26.
//
import SwiftUI
import Combine
struct ContentView: View {
private let numberGenerator = NumberGenerator()
var body: some View {
VStack(spacing: 50) {
Button {
numberGenerator.cancelAllStream()
} label: {
Text("cancel 시키기")
}
}
.padding()
}
}
final class NumberGenerator {
private var cancellables = Set<AnyCancellable>()
init() {
Just("Cancel Test Stream")
.delay(for: 100, scheduler: DispatchQueue.main)
.handleEvents(receiveCancel: {
print("별도로 만든 스트림 Cancel 됨. 쓰레드 정보: \(Thread.current)")
})
.subscribe(on: DispatchQueue.global())
.sink { _ in }
.store(in: &cancellables)
}
func cancelAllStream() {
cancellables.removeAll()
}
}
struct ContentView_Previews: PreviewProvider {
static var previews: some View {
ContentView()
}
}
위와 같이 코드를 짰으며 버튼을 누르면 모든 스트림이 자동으로 cancel되도록 하였다.
(값이 즉시 방출되서 끝나는 것을 막기 위해 delay
를 사용하였다.)
cancel
이 백그라운드 쓰레드에서 일어났음을 알 수 있다. 😆
나는 조금 더 상세하게 테스트를 진행해보고자 했다. 그래서 아래와 같이 Publisher, Subscription, Subscriber를 별도로 구현해 보았다.
// MARK: - Publisher
struct NumberPublisher: Publisher {
typealias Output = Int
typealias Failure = Never
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
Swift.print("1️⃣ Publisher의 receive 메서드 동작. 💻 쓰레드 정보: \(Thread.current)")
let subscription = NumberSubscription(downstream: subscriber)
subscriber.receive(subscription: subscription)
}
}
// MARK: - Subscription
final class NumberSubscription<Downstream: Subscriber>: Subscription where Downstream.Input == Int, Downstream.Failure == Never {
private var downstream: Downstream?
private var cancellables = Set<AnyCancellable>()
init(downstream: Downstream) {
print("2️⃣ Subscription 생성. 💻 쓰레드 정보: \(Thread.current)")
self.downstream = downstream
startNumberTimer()
}
private func startNumberTimer() {
Timer.publish(every: 5, on: .main, in: .common)
.autoconnect()
.sink { [weak self] _ in
print("5️⃣ Subscriber에게 값을 전달하고 새로운 demand를 받는 부분 동작. 💻 쓰레드 정보: \(Thread.current)")
let newDemand = self?.downstream?.receive(Int.random(in: 1...100))
self?.downstream?.receive(completion: .finished)
}
.store(in: &cancellables)
}
func request(_ demand: Subscribers.Demand) {
print("4️⃣ Subscriber에게 demand 요청 받음. 💻 쓰레드 정보: \(Thread.current)")
}
func cancel() {
print("8️⃣ subscription cancel됨!! 💻 쓰레드 정보: \(Thread.current)")
self.downstream = nil
cancellables.removeAll()
}
}
// MARK: - Subscriber
struct NumberSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
let combineIdentifier = CombineIdentifier()
func receive(subscription: Subscription) {
print("3️⃣ Subscription 받음. 💻 쓰레드 정보: \(Thread.current)")
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
print("6️⃣ Subscription에게서 값을 전달받음. 새로운 demand 전달 가능. 💻 쓰레드 정보: \(Thread.current)")
return .none
}
func receive(completion: Subscribers.Completion<Failure>) {
print("7️⃣ subscriber가 completion을 받음!! 💻 쓰레드 정보: \(Thread.current)")
}
}
//
// ContentView.swift
// CombineTest
//
// Created by JINHONG AN on 2023/03/26.
//
import SwiftUI
import Combine
// MARK: - View
struct ContentView: View {
private let numberGenerator = NumberGenerator()
var body: some View {
VStack {
Text("Coden의 블로그")
}
.padding()
}
}
// MARK: - Generator
final class NumberGenerator {
private let numberPublisher = NumberPublisher()
private var numberSubscriber = NumberSubscriber()
init() {
numberPublisher
.subscribe(on: DispatchQueue.global())
.subscribe(numberSubscriber)
}
}
struct ContentView_Previews: PreviewProvider {
static var previews: some View {
ContentView()
}
}
각 과정들이 상세히 나오도록 하였다. (뷰는 별 내용이 없다)
스크린샷을 보면 알 수 있듯이 (빨간 네모)
receive
메서드가 동작하면서 Subscription이 Subscriber에게 전달되는 일련의 동작위의 3가지는 subscribe(on:)
의 영향을 받음을 알 수 있다.
5초 뒤에 일어나는 나머지 부분들 (파랑 및 초록 네모)
위의 과정들은 메인쓰레드에서 동작함을 알 수 있다.
메인쓰레드에서 동작 한 것은 당연히 Timer를 main
RunLoop
에서 돌렸기 때문이다.
이렇게 보면 위에서 본 마블 그림이 잘못된 것처럼 보인다.
"send
하는 곳의 thread를 타는거지 subscribe(on:)
은 아무 의미가 없잖아?!" 하고 생각할 수도 있다.
하지만 그건 아니다. subscribe(on:)
이 값 방출 쓰레드에 영향을 미치는 경우가 존재한다.
[1, 2, 3, 4, 5]
.publisher
.subscribe(on: DispatchQueue.global())
.sink { number in
print("\(number)받음. Thread 정보: \(Thread.current)")
}
.store(in: &cancellables)
위의 코드를 테스트 해보면 아래와 같이 나온다.
아래의 코드도 비슷한 결과를 보여준다.
Just("Coden의 블로그")
.subscribe(on: DispatchQueue.global())
.sink { content in
print(content + " in \(Thread.current)")
}
.store(in: &cancellables)
즉, 위의 마블 그림 중
subscribe(on:)
에 의한 부분은Subject
류에 적용되지 않는다고 봐야 무방하다.
CurrentValueSubject
신기한 것은 CurrentValueSubject
이다.
private let sampleSubject = CurrentValueSubject<Int, Never>(1)
private var cancellables = Set<AnyCancellable>()
sampleSubject
.subscribe(on: DispatchQueue.global())
.sink { number in
print("\(number) 받음. Thread: \(Thread.current)")
}
.store(in: &cancellables)
위와 같이 코드를 짰고 버튼을 통해 메인쓰레드에서 sampleSubject
에 랜덤값이 보내지도록 구현하였다.
결과는 위와 같았다.
subscribe(on:)
의 영향을 받아 백그라운드 쓰레드에서 일어났으며send
가 이루어진 thread의 영향을 받아 메인쓰레드에서 이루어졌다. 추정하건데 이렇게 (초기)값 방출이
subscribe(on:)
의 영향을 받는 이유는 Subscriber가 Subscription을 받아request(Subscribers.Demand)
를 호출한 뒤 ➡️ 동일 쓰레드 하에서Subscription
이 바로 값을 Subscriber에게 전달하기 때문일 것이다.
delay(for:scheduler:)
를 쓰거나 receive(on:)
을 쓰는 경우에는 즉시 값방출이 이루어지는 위와 같은 경우라도 operator안에 넣어준 인자 쓰레드의 영향을 받는다!
receive(on:)
operator는 어디에 영향을 미칠까?receive(on:)
은 subscribe(on:)
이 영향을 미치지 못한 부분들에 적용된다고 보면 된다.
간단하게 위의 상세테스트 부분에서 뷰 부분만 아래와 같이 바꿔보고 테스트해보았다.
//
// ContentView.swift
// CombineTest
//
// Created by JINHONG AN on 2023/03/26.
//
import SwiftUI
import Combine
// MARK: - View
struct ContentView: View {
private let numberGenerator = NumberGenerator()
var body: some View {
VStack {
Text("Coden의 블로그")
}
.padding()
}
}
// MARK: - Generator
final class NumberGenerator {
private let numberPublisher = NumberPublisher()
private var numberSubscriber = NumberSubscriber()
init() {
numberPublisher
.receive(on: DispatchQueue.global()) // 🔥 다른 부분
.subscribe(numberSubscriber)
}
}
struct ContentView_Previews: PreviewProvider {
static var previews: some View {
ContentView()
}
}
subscribe(on:)
은 publisher, subscription과 같이 upstream 부분에 영향을 미친다.receive(on:)
은 subscriber의 값/completion 처리와 같이 downstream 부분에 영향을 미친다.subscribe(on:) vs receive(on:)
- trycombineCombine's subscribe(on:options:) operator
- stackoverflowCombine
- Apple Documentationsubscribe(on:)
- Apple Documentationreceive(on:)
- Apple Documentation공식문서를 잘 읽어봐야 함을 다시 한번 느끼는 사건이었다.
같이 고민하고 찾아보고 고생해준 우리 팀원들 짱!👍
구독합니다. subscribe(on:)