기존 콜백 기반 API, Delegate 패턴을
async/await문법으로 변환에 사용
배열과 같은 시퀀스를 순회하듯이 비동기적으로 생성되는 값을 for await 루프로 순차적 처리 가능
스트림 생성 시에 클로저 내부에서 continuation 객체 제공으로 값을 방출, 스트림 종료 지원
방출된 값들이 소비되기 이전까지 임시 버퍼 제공으로 처리된 값을 임시 저장 가능
버퍼 정책을 활용한 생산 속도와 소비 속도에 따라 알맞은 정책 선정 가능
ex) AsyncStream(Int.self, bufferingPolicy: .bufferingNewest(10))
.bufferingOldest(Int): 지정한 크기만큼 버퍼 유지하고 넘치는 데이터에 대해 새로운 값은 버림.bufferingNewest(Int): 지정한 크기만큼 버퍼 유지하고 넘치는 데이터에 대해 오래된 값 삭제.unbounded: 버퍼 크기에 제한 Xfinal class DefaultLegacySensorService {
private var timer: Timer?
func startUpdating(onUpdate: @escaping (Int) -> Void) {
timer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { _ in
let sensorData = Int.random(in: 1...100)
onUpdate(sensorData)
}
}
func stopUpdating() {
timer?.invalidate()
timer = nil
}
}
final class SensorStreamAdapter {
private let legacyService: DefaultLegacySensorService
init(legacyService: DefaultLegacySensorService) {
self.legacyService = legacyService
}
// 콜백 -> AsyncStream 브리징
var sensorData: AsyncStream<Int> {
return AsyncStream { continuation in
// 스트림 종료 시 자원 정리 (콜백 해제 및 업데이트 중지)
// Task가 취소되거나 break로 루프를 빠져나갈 때 호출
continuation.onTermination = { @Sendable [legacyService] termination in
switch termination {
case .finished:
// continuation.finish() 호출 시
legacyService.stopUpdating()
case .cancelled:
// 스트림 취소 시
}
}
// 레거시 API 호출 및 데이터 방출
legacyService.startUpdating { data in
// 콜백으로 들어온 데이터를 스트림 버퍼에 저장
continuation.yield(data)
}
// 만약 레거시 API에 '완료' 콜백이 따로 있다면,
// 해당 콜백 내부에서 continuation.finish()를 호출 필요
}
}
}
콜백 함수 호출부를 AsyncStream으로 감싸 비동기 콜백 함수의 리턴값을 AsyncSequence 형태로 처리할 수 있게 함.
콜백 함수의 리턴값들을 순차적으로 스트림 버퍼에 저장, 처리부에서 순차적 접근 가능 (yield() 메서드로 저장)
finish() 메서드로 onTermination 내부의 코드 실행 가능
onTermination 내부에서 스트림 취소에 따른 처리도 가능
final class SensorDataProcessor {
private let streamProvider: SensorStreamAdapter
private var processingTask: Task<Void, Never>?
init(streamProvider: SensorStreamAdapter) {
self.streamProvider = streamProvider
}
func start() {
processingTask = Task {
// 브리징된 스트림 소비
for await data in streamProvider.sensorData {
print("수신된 센서 데이터: \(data)")
// 데이터 처리 로직...
}
}
}
func stop() {
// Task를 취소하면 AsyncStream의 onTermination이 트리거되어 레거시 서비스도 중지
processingTask?.cancel()
}
}
에러도 던지는
Async Stream
// 에러 타입 정의
var sensorData: AsyncThrowingStream<Int, Error> {
return AsyncThrowingStream { continuation in
continuation.onTermination = { @Sendable [legacyService] termination in
switch termination {
case .finished:
// continuation.finish() 호출 시
legacyService.stopUpdating()
case .cancelled:
// 스트림 취소 시
}
}
legacyService.startUpdating { data in
// 잘못된 데이터일 시 에러 방출
if data == nil {
continuation.finish(throwing: DownloadError.dataError)
}
continuation.yield(data)
}
}
}
func start() {
processingTask = Task {
do {
// 에러 가능성이 생김
for try await data in streamProvider.sensorData {
print("수신된 센서 데이터: \(data)")
}
} catch {
// sensorData에서
// finish(throwing:)으로
// 비정상 종료가 발생하면 에러 처리
}
}
}
Swift Concurrency에서 시간의 흐름에 따라 비동기적으로 값을 생성하는 모든 시퀀스의 근간이 되는 프로토콜
일반적인 배열이나 컬렉션이 사용하는 Sequence 프로토콜의 비동기 버전
for await in 루프를 사용할 수 있다는 자격을 부여하는 인터페이스
위 AsyncStream, AsyncThrowingStream이 준수하는 프로토콜
| 특징 | withCheckedContinuation | AsyncStream |
|---|---|---|
| 용도 | 단일 비동기 결과 (1회성) | 연속적인 비동기 데이터 흐름 (다발성) |
| 반환 방식 | resume(returning:) 또는 resume(throwing:) | yield(_:) (여러 번 호출 가능), finish() |
| 호출 제약 (중요) | 정확히 단 한 번만 resume 되어야 함(2번 이상 호출 시 크래시, 0번 호출 시 무한 대기 및 메모리 누수) | yield는 0번부터 무한대까지 자유롭게 호출 가능 |
| 구문 형태 | let value = await fetch() | for await value in stream |
참고
https://velog.io/@js1436kt/Swift-Concurrency-Continuation
https://developer.apple.com/documentation/swift/asyncstream
https://developer.apple.com/documentation/swift/asyncthrowingstream
https://developer.apple.com/documentation/swift/asyncsequence
https://developer.apple.com/documentation/swift/withcheckedcontinuation(isolation:function:_:)