AsyncSequence는 기존 콜백 지옥과 복잡한 비동기 처리 문제를 해결하기 위해 등장했습니다.
for-await-in구문으로 스트리밍 데이터를 마치 일반 배열처럼 간단하게 처리할 수 있게 해주는 방식입니다.
먼저 AsyncSequence를 이해하기 위해 두 가지 핵심 개념부터 살펴보겠습니다.
Async는 "비동기(Asynchronous)"를 의미합니다.
// 동기적 처리 - UI가 멈춘다 😱
let data = downloadImage() // 3초 동안 UI 멈춤
updateUI(with: data)
// 비동기적 처리 - UI가 부드럽다 ✨
Task {
let data = await downloadImage() // UI는 계속 반응함
updateUI(with: data)
}
핵심 특징:
Sequence는 "순서가 있는 데이터의 집합"입니다:
let numbers = [1, 2, 3, 4, 5]
for number in numbers {
print(number) // 순차적으로 출력: 1, 2, 3, 4, 5
}
핵심 특징:
for-in 루프로 순회할 수 있는 것들이 두 개념을 합친 AsyncSequence는:
// 실시간으로 들어오는 메시지를 순차적으로 처리
for await message in chatMessages {
displayMessage(message) // 메시지가 올 때마다 처리
}
AsyncSequence를 이해하려면, 먼저 기존 방식의 문제점을 살펴봐야 합니다.
웹소켓이나 스트리밍 데이터를 처리할 때, 예전에는 이런 식으로 코딩했습니다:
// 😱 콜백 지옥의 예시
class WebSocketManager {
func connectToServer() {
socket.connect { [weak self] success in
if success {
self?.socket.onMessage { message in
self?.processMessage(message) { result in
switch result {
case .success(let data):
self?.updateUI(data) { completion in
// 또 다른 콜백...
}
case .failure(let error):
// 에러 처리...
}
}
}
}
}
}
}
1. 가독성 문제
2. 제어 흐름 문제
3. 에러 처리의 복잡성
NotificationCenter
// 코드가 분산되고 데이터 흐름 파악이 어려움
NotificationCenter.default.addObserver(/* ... */)
NotificationCenter.default.post(/* ... */)
Combine
// 강력하지만 러닝 커브가 높음
publisher
.sink { completion in /* ... */ }
.receive(on: DispatchQueue.main)
.map { /* ... */ }
.filter { /* ... */ }
이런 문제들을 해결하기 위해 AsyncSequence가 등장했습니다!
// 😍 간단하고 직관적인 AsyncSequence
for await message in webSocketMessages {
let processedData = processMessage(message)
await updateUI(processedData)
}
주요 개선점:
1. 가독성 향상: for-await-in 구문으로 비동기 데이터를 순차적으로 처리
2. 제어 흐름 명확화: await 키워드로 다음 데이터가 준비될 때까지 안전하게 대기
3. 익숙한 패턴: 기존 for-in 루프와 동일한 직관적인 문법
// 대용량 파일도 청크 단위로 안전하게 처리
for await chunk in fileDownloadStream {
await saveChunkToFile(chunk)
updateProgressBar(chunk.progress)
// 전체 파일을 메모리에 올리지 않고 스트리밍 처리
}
// NotificationCenter를 AsyncSequence로 변환
let notifications = NotificationCenter.default.notifications(named: .userDidLogin)
for await notification in notifications {
handleUserLogin(notification)
}
AsyncSequence는 작업이 중단되거나 종료될 때도 안전하게 리소스를 정리합니다.
// Task 취소 시에도 안전하게 정리
let task = Task {
do {
for await data in networkStream {
await processData(data)
}
} catch {
// 에러나 취소 시 자동으로 정리
print("작업 중단: \(error)")
}
}
// 나중에 취소해도 안전
task.cancel() // 현재 처리 중인 데이터까지 완료 후 안전하게 종료
주요 안전성 기능:
WWDC에서 소개된 지진 데이터 예시를 통해 이전 방식과 AsyncSequence 방식의 차이를 비교해보겠습니다.
// 콜백 지옥의 현실...
func processEarthquakeDataOldWay() {
let feed = EarthquakeFeed()
var processedCount = 0
let maxCount = 3
print("지진 데이터 처리 시작 (콜백 방식)...")
feed.getFeed { (line, error) in
// 1. 종료 조건 확인
guard let line = line else {
print("데이터 처리 완료.")
return
}
// 2. String -> Earthquake 객체로 변환
guard let earthquake = parse(line: line) else {
return
}
// 3. 필터링 로직
if earthquake.magnitude < 5.0 {
return
}
// 4. 개수 제한 로직 (상태 관리가 복잡!)
if processedCount >= maxCount {
return
}
// 5. 데이터 처리
print("규모 \(String(format: "%.1f", earthquake.magnitude)) 지진 감지 - \(earthquake.location)")
processedCount += 1
}
}
문제점:
processedCount) 복잡// 깔끔하고 직관적인 코드!
func processEarthquakeData() async {
let feed = EarthquakeFeed()
print("지진 데이터 처리 시작...")
do {
// 체이닝으로 간결하게 처리
for await earthquake in feed.lines
.map(parse) // String -> Earthquake 변환
.compactMap({ $0 }) // nil 필터링
.filter({ $0.magnitude >= 5.0 }) // 규모 필터링
.prefix(3) // 개수 제한
{
print("규모 \(String(format: "%.1f", earthquake.magnitude)) 지진 감지 - \(earthquake.location)")
}
print("필터링된 지진 데이터 3개 처리 완료.")
} catch {
print("오류 발생: \(error.localizedDescription)")
}
}
개선점:
prefix(3)로 개수 제한 자동 처리do-catch로 모든 에러를 한 곳에서 처리| 구분 | 이전 방식 | AsyncSequence |
|---|---|---|
| 가독성 | 콜백 중첩으로 복잡 | 순차적이고 직관적 |
| 상태 관리 | 수동으로 변수 관리 | 자동으로 처리 |
| 에러 처리 | 각 단계마다 개별 처리 | 통합된 에러 처리 |
| 데이터 흐름 | 분산되어 파악 어려움 | 체이닝으로 명확한 흐름 |
AsyncSequence는 작업 종료 시 안정성을 보장하는 여러 메커니즘을 제공합니다.
class DataProcessor {
private var processingTask: Task<Void, Never>?
func startProcessing() {
processingTask = Task {
defer {
// 작업 종료 시 항상 실행되는 정리 코드
print("리소스 정리 완료")
}
do {
for await data in dataStream {
// 취소 확인 - 협력적 취소
try Task.checkCancellation()
await processData(data)
}
} catch is CancellationError {
print("작업이 안전하게 취소되었습니다")
} catch {
print("에러 발생: \(error)")
}
}
}
func stopProcessing() {
processingTask?.cancel() // 안전한 취소
processingTask = nil
}
}
let customStream = AsyncStream<String> { continuation in
// 데이터 생성 로직
let timer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { _ in
continuation.yield("데이터 \(Date())")
}
// 종료 시 정리 작업
continuation.onTermination = { @Sendable termination in
timer.invalidate() // 타이머 정리
print("스트림 종료: \(termination)")
}
}
// 사용 시에도 안전하게 종료
for await data in customStream {
print(data)
// break나 return으로 종료해도 onTermination 호출됨
if someCondition { break }
}
// 실시간 채팅 - 연결 종료 시 자동 정리
for await message in chatSocket.messages {
await updateChatUI(with: message)
}
// 루프 종료 시 소켓 연결 자동 해제
// 파일 업로드 - 취소 시 임시 파일 정리
for await progress in uploadTask.progressUpdates {
updateProgressBar(progress.percentComplete)
if progress.isCancelled { break } // 안전한 중단
}
📚 참고: WWDC 2021 - Meet AsyncSequence
🔗 관련 학습: Swift Concurrency, async/await, Actor 모델