Combine - subscribe(on:) VS. receive(on:)

Coden·2023년 3월 26일
7

Combine

목록 보기
2/2
post-thumbnail

서두

요즘 Combine을 조금씩 공부해보고 있다. 커스텀 연산자도 만들어보면서..
근데 이 과정에서 신기한 것이 있었다. 바로 subscribe(on:) 연산자.

아래는 나와 회사 팀원들이 같이 고민해보고 찾아본 결과를 토대로 작성한 글이다.


요약

나는 Combine의 subscribe(on:)이 스트림에 대한 기본 쓰레드 지정용으로 쓰는 줄 알았다.
때문에 이 operator를 쓰면 - 일반적인 publisher에 쓰든 subject같은 값 주입이 가능한 publisher에 쓰든 항상 sink 클로저는 해당 쓰레드에서 돌아가게 되는 줄 알았다.(receive(on:)을 안썼다면)

그런데 그게 아니었다.


본론 - subscribe(on:)

참고를 위해 RxSwift subscribeOn 마블 그림을 가져와보았다.


specify the Scheduler on which an Observable will operate
➡️ ReactiveX subscribeOn 설명 중.. 링크

위의 그림을 보면 알 수 있듯 RxSwift subscribeOn은 Observable 동작이 시작될 쓰레드를 지정하는데에 쓰인다. (선언 위치는 상관이 없다.)
이 그림은 Combine의 subscribe(on:)에도 비슷하게 적용된다.

그런데 이 형태가 subject 종류의 스트림에도 동일하게 적용될 수 있을까?


테스트 코드 1

//
//  ContentView.swift
//  CombineTest
//
//  Created by JINHONG AN on 2023/03/26.
//

import SwiftUI
import Combine

struct ContentView: View {
    private let numberGenerator = NumberGenerator()
    
    var body: some View {
        VStack {
            Button {
                numberGenerator.generateRandomNumber()
            } label: {
                Text("랜덤 숫자 생성하기")
            }

        }
        .padding()
    }
}

final class NumberGenerator {
    private let numberSubject = PassthroughSubject<Int, Never>()
    private var cancellables = Set<AnyCancellable>()
    
    init() {
        numberSubject
            .subscribe(on: DispatchQueue.global())
            .sink { number in
                print("🎉 \(number)숫자 출력되었습니다!! - 쓰레드 정보: \(Thread.current)")
            }
            .store(in: &cancellables)
    }
    
    func generateRandomNumber() {
        numberSubject.send(Int.random(in: 0...100))
    }
}


struct ContentView_Previews: PreviewProvider {
    static var previews: some View {
        ContentView()
    }
}

위와 같이 코드를 짰고 화면의 버튼을 누르면 숫자 및 동작 쓰레드가 콘솔 로그에 찍히도록 만들었다.
그리고 나는 항상 백그라운드 쓰레드에서 결과값이 찍힐 것이라고 예상했다.

왜냐하면
1. generateRandomNumber함수 호출이 View의 body에서 이루어지기 때문에 MainActor에서 호출됨
2. send가 MainActor에서 이루어짐
3. 하지만 subscribe(on:)에서 DispatchQueue.global()를 걸어줬기 때문에 값 방출은 백그라운드 쓰레드에서 동작함(위에서 본 RxSwift 그림처럼)
4. sink 내부에서 쓰레드 출력은 백그라운드 쓰레드로 찍힘

위와같이 생각했기 때문이다.

하지만 결과는 달랐다.


결과 1

아니 모두 main thread로 찍힌다고?!!?? 😯

이러한 결과값은 나를 혼돈에 빠뜨렸다. (물론 팀원들도 멘붕에 빠졌었다.)
왜 이런 결과가 나온 걸까?


분석

정답을 찾아나가기 전에 우리는 subscribe 과정이 어떻게 일어나는지 알아볼 필요가 있다.
아래는 subscribe가 일어나는 과정을 내 나름대로(😅) 그려본 것이다.

subscribe 과정을 직접 구현하는 경우 구현 형태는 충분히 달라질 수 있다. 따라서 아래의 그림이 항상 맞는 것은 아닐 수 있다.

직접 그린 기존 그림(참고용)
  1. 이벤트 대상으로부터 publisher를 생성한다.
  2. 해당 publisher를 subscribe한다. (publisher의 subscribe(_:)를 호출하거나 sink를 하거나)
  3. publisher는 subscription을 만들고
  4. 이를 subscriber에게 전달한다. (subscriber의 receive(subscription:) 호출)
  5. subscriber는 받은 subscription의 request(_:)를 호출하여 subscription에게 원하는 element개수를 전달한다.
  6. 이후 적절한 때에 이벤트 주체는 이벤트/값을 발생시키고 이를 subscription에게 전달한다.
  7. subscription은 해당 이벤트/값을 subscriber의 receive(_:)를 통해 재전달한다. (이 때 하나 전달했으니 demand는 1 차감한다)
  8. 값을 받은 subscriber는 반환값으로 demand를 재전달한다. ('추가적으로 ~개 더 줘' 하는 셈)
  9. (그림에는 없지만) subscription은 받은 demand값을 적용한다. (추후 해당 demand 수 만큼 이벤트/값을 subscriber에게 더 전달한다.)

참조 관계는 일반적으로 아래와 같이 구성된다.

직접 그린 기존 그림(참고용)

이렇게 subscribe의 과정을 조금 자세히 살펴보았다.
이제 다시 아래의 결과가 왜 나왔는지 찾아가보도록 하자.



subscribe(on:) operator 공식문서

정답은 의외로 항상 가장 가까운 곳에 있다. subscribe(on:) - Apple 공식문서를 봐보자.


subscribe, cancel, request 동작을 실행시킬 스케쥴러를 지정한다.

downstream 메시지에 영향을 미치는 receive(on:options:)와는 반대로 subscribe(on:options:)upstream 메시지의 실행 context를 변경한다.


subscribe, cancel, request 동작을 실행시킬 스케쥴러를 지정한다고 나와있다.
즉, subscribe(on:)으로 지정하게 되는 것은 publisher의 subscribe(_:), subscription의 request(_:)cancel()을 실행시킬 스케쥴러뿐이다.


테스트 2

//
//  ContentView.swift
//  CombineTest
//
//  Created by JINHONG AN on 2023/03/26.
//

import SwiftUI
import Combine

struct ContentView: View {
    private let numberGenerator = NumberGenerator()
    
    var body: some View {
        VStack(spacing: 50) {
            Button {
                numberGenerator.generateRandomNumber()
            } label: {
                Text("랜덤 숫자 생성하기")
            }
            
            Button {
                numberGenerator.generateCompletion()
            } label: {
                Text("completion 시키기")
            }
        }
        .padding()
    }
}

final class NumberGenerator {
    private let numberSubject = PassthroughSubject<Int, Never>()
    private let numberSubscriber: AnySubscriber<Int, Never>
    
    init() {
        numberSubscriber = .init(receiveSubscription: { subscription in
            print("subscription을 받았습니다. - 💻쓰레드 정보: \(Thread.current)")
            
            subscription.request(.unlimited)
        }, receiveValue: { number in
            print("\(number)숫자 받았습니다!! - 💻쓰레드 정보: \(Thread.current)")
            
            return .none
        }, receiveCompletion: { completion in
            print("\(completion) 받았습니다!! - 💻쓰레드 정보: \(Thread.current)")
        })
        
        numberSubject
            .handleEvents(receiveSubscription: { _ in
                print("receive subscription - 💻쓰레드 정보: \(Thread.current)")
            }, receiveOutput: { _ in
                print("receive output - 💻쓰레드 정보: \(Thread.current)")
            }, receiveCompletion: { _ in
                print("receive completion - 💻쓰레드 정보: \(Thread.current)")
            }, receiveCancel: {
                print("receive cancel - 💻쓰레드 정보: \(Thread.current)")
            }, receiveRequest: { _ in
                print("receive request - 💻쓰레드 정보: \(Thread.current)")
            })
            .subscribe(on: DispatchQueue.global())
            .subscribe(numberSubscriber)
    }
    
    func generateRandomNumber() {
        print("\(#function) 호출되었습니다. - 💻쓰레드 정보: \(Thread.current)")
        numberSubject.send(Int.random(in: 0...100))
    }
    
    func generateCompletion() {
        print("\(#function) 호출되었습니다. - 💻쓰레드 정보: \(Thread.current)")
        numberSubject.send(completion: .finished)
    }
}


struct ContentView_Previews: PreviewProvider {
    static var previews: some View {
        ContentView()
    }
}

이번에는 위와 같이 AnySubscriber를 만들어서 Subscribe하는 과정을 볼 수 있도록 하였다.
그리고 handleEvents operator를 사용하여 구독 과정에서는 어떠한 일들이 발생하는지 알 수 있도록 하였다.

handleEvents는 뭐랄까 중간 감시자같은 느낌으로 동작한다고 보면 쉬울 것 같다.
뭐가 어떻게 오고가는지 보는 출입국 심사 - 보안검사대 같은 느낌?


결과 2

앱 실행 -> 랜덤 숫자 생성 2번 탭 -> completion 시키기 탭 한 결과는 아래와 같았다.

위의 결과 스크린샷에서도 볼 수 있듯이 subscription이 subscriber에게 전달되는 일련의 과정과 subscriber가 해당 subscripton을 이용하여 request demand를 하는 과정은 백그라운드 쓰레드에서 일어남을 알 수 있다.

이외에 - completion을 보내는 일련의 과정들은 main thread에서 일어남을 볼 수 있다.

값과 comletion 처리과정이 main thread에서 일어난 이유는 '로직의 시작이 View의 body를 담당하는MainActor에서 시작되었기 때문'이라고 볼 수 있다. ➡️ send가 main thread에서 일어남


테스트 3 (cancel 테스트)

cancel()을 외부에서 직접 호출 할 시 호출 쓰레드가 영향을 주므로 ➡️ 자동으로 cancel()이 호출되도록 해야한다. 따라서 테스트를 별도로 만들어 진행하였다.

//
//  ContentView.swift
//  CombineTest
//
//  Created by JINHONG AN on 2023/03/26.
//

import SwiftUI
import Combine

struct ContentView: View {
     private let numberGenerator = NumberGenerator()
     
     var body: some View {
         VStack(spacing: 50) {
             Button {
                 numberGenerator.cancelAllStream()
             } label: {
                 Text("cancel 시키기")
             }

         }
         .padding()
     }
 }

 final class NumberGenerator {
     private var cancellables = Set<AnyCancellable>()
     
     init() {
         Just("Cancel Test Stream")
             .delay(for: 100, scheduler: DispatchQueue.main)
             .handleEvents(receiveCancel: {
                 print("별도로 만든 스트림 Cancel 됨. 쓰레드 정보: \(Thread.current)")
             })
             .subscribe(on: DispatchQueue.global())
             .sink { _ in }
             .store(in: &cancellables)
     }
     
     func cancelAllStream() {
         cancellables.removeAll()
     }
 }


 struct ContentView_Previews: PreviewProvider {
     static var previews: some View {
         ContentView()
     }
 }

위와 같이 코드를 짰으며 버튼을 누르면 모든 스트림이 자동으로 cancel되도록 하였다.
(값이 즉시 방출되서 끝나는 것을 막기 위해 delay를 사용하였다.)


결과 3 (cancel 테스트)

cancel이 백그라운드 쓰레드에서 일어났음을 알 수 있다. 😆


테스트 4 (상세 테스트)

나는 조금 더 상세하게 테스트를 진행해보고자 했다. 그래서 아래와 같이 Publisher, Subscription, Subscriber를 별도로 구현해 보았다.


구현부

// MARK: - Publisher
struct NumberPublisher: Publisher {
    typealias Output = Int
    typealias Failure = Never
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        Swift.print("1️⃣ Publisher의 receive 메서드 동작. 💻 쓰레드 정보: \(Thread.current)")
        let subscription = NumberSubscription(downstream: subscriber)
        subscriber.receive(subscription: subscription)
    }
}

// MARK: - Subscription
final class NumberSubscription<Downstream: Subscriber>: Subscription where Downstream.Input == Int, Downstream.Failure == Never {
    private var downstream: Downstream?
    private var cancellables = Set<AnyCancellable>()
    
    init(downstream: Downstream) {
        print("2️⃣ Subscription 생성. 💻 쓰레드 정보: \(Thread.current)")
        
        self.downstream = downstream
        startNumberTimer()
    }
    
    private func startNumberTimer() {
        Timer.publish(every: 5, on: .main, in: .common)
            .autoconnect()
            .sink { [weak self] _ in
                print("5️⃣ Subscriber에게 값을 전달하고 새로운 demand를 받는 부분 동작. 💻 쓰레드 정보: \(Thread.current)")
                let newDemand = self?.downstream?.receive(Int.random(in: 1...100))
                
                self?.downstream?.receive(completion: .finished)
            }
            .store(in: &cancellables)
    }
    
    func request(_ demand: Subscribers.Demand) {
        print("4️⃣ Subscriber에게 demand 요청 받음. 💻 쓰레드 정보: \(Thread.current)")
    }
    
    func cancel() {
        print("8️⃣ subscription cancel됨!! 💻 쓰레드 정보: \(Thread.current)")
        self.downstream = nil
        cancellables.removeAll()
    }
}

// MARK: - Subscriber
struct NumberSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never
    
    let combineIdentifier = CombineIdentifier()
    
    func receive(subscription: Subscription) {
        print("3️⃣ Subscription 받음. 💻 쓰레드 정보: \(Thread.current)")
        subscription.request(.unlimited)
    }
    
    func receive(_ input: Input) -> Subscribers.Demand {
        print("6️⃣ Subscription에게서 값을 전달받음. 새로운 demand 전달 가능. 💻 쓰레드 정보: \(Thread.current)")
        return .none
    }
    
    func receive(completion: Subscribers.Completion<Failure>) {
        print("7️⃣ subscriber가 completion을 받음!! 💻 쓰레드 정보: \(Thread.current)")
    }
}

사용부

//
//  ContentView.swift
//  CombineTest
//
//  Created by JINHONG AN on 2023/03/26.
//

import SwiftUI
import Combine


// MARK: - View
struct ContentView: View {
    private let numberGenerator = NumberGenerator()
    
    var body: some View {
        VStack {
            Text("Coden의 블로그")
        }
        .padding()
    }
}

// MARK: - Generator
final class NumberGenerator {
    private let numberPublisher = NumberPublisher()
    private var numberSubscriber = NumberSubscriber()
    
    init() {
        numberPublisher
            .subscribe(on: DispatchQueue.global())
            .subscribe(numberSubscriber)
    }
}

struct ContentView_Previews: PreviewProvider {
    static var previews: some View {
        ContentView()
    }
}

각 과정들이 상세히 나오도록 하였다. (뷰는 별 내용이 없다)


결과 4 (상세 테스트)


스크린샷을 보면 알 수 있듯이 (빨간 네모)

  • Publisher의 receive메서드가 동작하면서 Subscription이 Subscriber에게 전달되는 일련의 동작
  • Subscriber가 받은 Subscription을 이용하여 request demand를 하는 동작(초기)
  • Subscription이 받은 demand를 처리하는 동작(초기)

위의 3가지는 subscribe(on:)의 영향을 받음을 알 수 있다.


5초 뒤에 일어나는 나머지 부분들 (파랑초록 네모)

  • subscription이 subscriber에게 값을 전달하고 (동시에 새로운 demand를 받고)
  • subscription이 subscriber에게 completion을 전달하는 동작

위의 과정들은 메인쓰레드에서 동작함을 알 수 있다.

메인쓰레드에서 동작 한 것은 당연히 Timer를 main RunLoop에서 돌렸기 때문이다.




본론 2 - 마블 그림?

이렇게 보면 위에서 본 마블 그림이 잘못된 것처럼 보인다.
"send하는 곳의 thread를 타는거지 subscribe(on:)은 아무 의미가 없잖아?!" 하고 생각할 수도 있다.

하지만 그건 아니다. subscribe(on:)이 값 방출 쓰레드에 영향을 미치는 경우가 존재한다.


[1, 2, 3, 4, 5]
    .publisher
    .subscribe(on: DispatchQueue.global())
    .sink { number in
        print("\(number)받음. Thread 정보: \(Thread.current)")
    }
    .store(in: &cancellables)

위의 코드를 테스트 해보면 아래와 같이 나온다.


아래의 코드도 비슷한 결과를 보여준다.

Just("Coden의 블로그")
    .subscribe(on: DispatchQueue.global())
	.sink { content in
		print(content + " in \(Thread.current)")
	}
	.store(in: &cancellables)

즉, 위의 마블 그림 중 subscribe(on:)에 의한 부분은 Subject류에 적용되지 않는다고 봐야 무방하다.


CurrentValueSubject

신기한 것은 CurrentValueSubject이다.

private let sampleSubject = CurrentValueSubject<Int, Never>(1)
private var cancellables = Set<AnyCancellable>()

sampleSubject
	.subscribe(on: DispatchQueue.global())
	.sink { number in
		print("\(number) 받음. Thread: \(Thread.current)")
	}
	.store(in: &cancellables)

위와 같이 코드를 짰고 버튼을 통해 메인쓰레드에서 sampleSubject에 랜덤값이 보내지도록 구현하였다.


결과는 위와 같았다.

  • 초기값 방출은 subscribe(on:)의 영향을 받아 백그라운드 쓰레드에서 일어났으며
  • 이후의 방출은 send가 이루어진 thread의 영향을 받아 메인쓰레드에서 이루어졌다.

추정하건데 이렇게 (초기)값 방출이 subscribe(on:)의 영향을 받는 이유는 Subscriber가 Subscription을 받아 request(Subscribers.Demand)를 호출한 뒤 ➡️ 동일 쓰레드 하에서 Subscription이 바로 값을 Subscriber에게 전달하기 때문일 것이다.


주의 할 부분

delay(for:scheduler:)를 쓰거나 receive(on:)을 쓰는 경우에는 즉시 값방출이 이루어지는 위와 같은 경우라도 operator안에 넣어준 인자 쓰레드의 영향을 받는다!




본론 3 - receive(on:) operator는 어디에 영향을 미칠까?

receive(on:)subscribe(on:)이 영향을 미치지 못한 부분들에 적용된다고 보면 된다.

간단하게 위의 상세테스트 부분에서 뷰 부분만 아래와 같이 바꿔보고 테스트해보았다.

테스트 코드

//
//  ContentView.swift
//  CombineTest
//
//  Created by JINHONG AN on 2023/03/26.
//

import SwiftUI
import Combine

// MARK: - View
struct ContentView: View {
    private let numberGenerator = NumberGenerator()
    
    var body: some View {
        VStack {
            Text("Coden의 블로그")
        }
        .padding()
    }
}

// MARK: - Generator
final class NumberGenerator {
    private let numberPublisher = NumberPublisher()
    private var numberSubscriber = NumberSubscriber()
    
    init() {
        numberPublisher
            .receive(on: DispatchQueue.global()) // 🔥 다른 부분
            .subscribe(numberSubscriber)
    }
}

struct ContentView_Previews: PreviewProvider {
    static var previews: some View {
        ContentView()
    }
}

결과

  • 이번에는 반대로 초기 subscription을 만드는 부분들은 메인 쓰레드에서 동작하였다. (빨간 네모)
  • Timer를 통해 값/completion이 방출되는 부분 또한 메인 쓰레드에서 동작하였다. (초록 네모)
  • Subscriber가 받은 값/completion을 처리하는 부분은 백그라운드 쓰레드에서 동작하였다. (파란 네모)




결론

  • subscribe(on:)은 publisher, subscription과 같이 upstream 부분에 영향을 미친다.
  • receive(on:)은 subscriber의 값/completion 처리와 같이 downstream 부분에 영향을 미친다.

같이 읽어보면 좋은 문서들

References


P.S

공식문서를 잘 읽어봐야 함을 다시 한번 느끼는 사건이었다.
같이 고민하고 찾아보고 고생해준 우리 팀원들 짱!👍

profile
iOS 공부중인 Coden

2개의 댓글

comment-user-thumbnail
2023년 3월 26일

구독합니다. subscribe(on:)

답글 달기
comment-user-thumbnail
2023년 8월 25일

글이 참 좋길래 댓글달러 내려와보니 코든 블로그였군요 ㅎㅎㅎ
덕분에 많이 배워갑니다

답글 달기