Swift Concurrency: AsyncStream

틀틀보·2026년 3월 14일

Swift Concurency

목록 보기
11/11

Async Stream

기존 콜백 기반 API, Delegate 패턴을 async/await 문법으로 변환에 사용

  • 배열과 같은 시퀀스를 순회하듯이 비동기적으로 생성되는 값을 for await 루프로 순차적 처리 가능

  • 스트림 생성 시에 클로저 내부에서 continuation 객체 제공으로 값을 방출, 스트림 종료 지원

  • 방출된 값들이 소비되기 이전까지 임시 버퍼 제공으로 처리된 값을 임시 저장 가능

  • 버퍼 정책을 활용한 생산 속도와 소비 속도에 따라 알맞은 정책 선정 가능

  • ex) AsyncStream(Int.self, bufferingPolicy: .bufferingNewest(10))

    • .bufferingOldest(Int): 지정한 크기만큼 버퍼 유지하고 넘치는 데이터에 대해 새로운 값은 버림
    • .bufferingNewest(Int): 지정한 크기만큼 버퍼 유지하고 넘치는 데이터에 대해 오래된 값 삭제
    • .unbounded: 버퍼 크기에 제한 X

코드

final class DefaultLegacySensorService {
    private var timer: Timer?

    func startUpdating(onUpdate: @escaping (Int) -> Void) {
        timer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { _ in
            let sensorData = Int.random(in: 1...100)
            onUpdate(sensorData)
        }
    }

    func stopUpdating() {
        timer?.invalidate()
        timer = nil
    }
}
final class SensorStreamAdapter {
    private let legacyService: DefaultLegacySensorService
    
    init(legacyService: DefaultLegacySensorService) {
        self.legacyService = legacyService
    }
    
    // 콜백 -> AsyncStream 브리징
    var sensorData: AsyncStream<Int> {
        return AsyncStream { continuation in
            
            // 스트림 종료 시 자원 정리 (콜백 해제 및 업데이트 중지)
            // Task가 취소되거나 break로 루프를 빠져나갈 때 호출
            continuation.onTermination = { @Sendable [legacyService] termination in
            	switch termination {
                	case .finished:
                    	// continuation.finish() 호출 시
                        legacyService.stopUpdating()
                        
					case .cancelled:
                    	// 스트림 취소 시
                }
                
            }
            
            // 레거시 API 호출 및 데이터 방출 
            legacyService.startUpdating { data in
                // 콜백으로 들어온 데이터를 스트림 버퍼에 저장
                continuation.yield(data)
            }
            
            // 만약 레거시 API에 '완료' 콜백이 따로 있다면, 
            // 해당 콜백 내부에서 continuation.finish()를 호출 필요
        }
    }
}
  • 콜백 함수 호출부를 AsyncStream으로 감싸 비동기 콜백 함수의 리턴값을 AsyncSequence 형태로 처리할 수 있게 함.

  • 콜백 함수의 리턴값들을 순차적으로 스트림 버퍼에 저장, 처리부에서 순차적 접근 가능 (yield() 메서드로 저장)

  • finish() 메서드로 onTermination 내부의 코드 실행 가능

  • onTermination 내부에서 스트림 취소에 따른 처리도 가능

final class SensorDataProcessor {
    private let streamProvider: SensorStreamAdapter
    private var processingTask: Task<Void, Never>?
    
    init(streamProvider: SensorStreamAdapter) {
        self.streamProvider = streamProvider
    }
    
    func start() {
        processingTask = Task {
            // 브리징된 스트림 소비
            for await data in streamProvider.sensorData {
                print("수신된 센서 데이터: \(data)")
                
                // 데이터 처리 로직...
            }
        }
    }
    
    func stop() {
        // Task를 취소하면 AsyncStream의 onTermination이 트리거되어 레거시 서비스도 중지
        processingTask?.cancel()
    }
}

Async ThrowingStream

에러도 던지는 Async Stream

// 에러 타입 정의
var sensorData: AsyncThrowingStream<Int, Error> {
        return AsyncThrowingStream { continuation in
            continuation.onTermination = { @Sendable [legacyService] termination in
            	switch termination {
                	case .finished:
                    	// continuation.finish() 호출 시
                        legacyService.stopUpdating()
                        
					case .cancelled:
                    	// 스트림 취소 시
                }
                
            }
            
            legacyService.startUpdating { data in
            	// 잘못된 데이터일 시 에러 방출
            	if data == nil {
                	continuation.finish(throwing: DownloadError.dataError)
                }
                continuation.yield(data)
            }
            
        }
    }
func start() {
        processingTask = Task {
            do {
            	// 에러 가능성이 생김
            	for try await data in streamProvider.sensorData {
                	print("수신된 센서 데이터: \(data)")
            	}
            } catch {
            	// sensorData에서 
                // finish(throwing:)으로 
                // 비정상 종료가 발생하면 에러 처리
            } 
        }
    }

AsyncSequence

Swift Concurrency에서 시간의 흐름에 따라 비동기적으로 값을 생성하는 모든 시퀀스의 근간이 되는 프로토콜

  • 일반적인 배열이나 컬렉션이 사용하는 Sequence 프로토콜의 비동기 버전

  • for await in 루프를 사용할 수 있다는 자격을 부여하는 인터페이스

  • AsyncStream, AsyncThrowingStream이 준수하는 프로토콜

CheckedContinuation과의 차이

단발성과 다발성

특징withCheckedContinuationAsyncStream
용도단일 비동기 결과 (1회성)연속적인 비동기 데이터 흐름 (다발성)
반환 방식resume(returning:) 또는 resume(throwing:)yield(_:) (여러 번 호출 가능), finish()
호출 제약 (중요)정확히 단 한 번만 resume 되어야 함
(2번 이상 호출 시 크래시, 0번 호출 시 무한 대기 및 메모리 누수)
yield는 0번부터 무한대까지 자유롭게 호출 가능
구문 형태let value = await fetch()for await value in stream
  • 비동기 결과에 대한 값이 여러 개냐, 한 개냐의 차이

참고
https://velog.io/@js1436kt/Swift-Concurrency-Continuation

https://developer.apple.com/documentation/swift/asyncstream

https://developer.apple.com/documentation/swift/asyncthrowingstream

https://developer.apple.com/documentation/swift/asyncsequence

https://developer.apple.com/documentation/swift/withcheckedcontinuation(isolation:function:_:)

profile
안녕하세요! iOS 개발자입니다!

0개의 댓글