C#에서 비동기 코드는 async
와 await
키워드로 간결하게 작성할 수 있다. I/O 중심 작업은 불필요한 대기를 없애고, CPU 중심 작업은 병렬 처리로 실행 시간을 단축한다.
스레드가 비동기 작업을 수행 중 await
키워드로 다른 비동기 함수를 호출하면 동기적으로 완료되거나 혹은 멈춘다. 그리고 다른 작업을 처리하다가 재개할 수 있는 시점이 오면 멈췄던 위치부터 다시 작업을 시작한다. 이 과정이 어떻게 진행되는지 알아보려 한다.
C#의 비동기 프로그래밍을 구현할 수 있는 방법은 다음과 같다.
이 중에서 APM과 EAP는 가볍게 살펴보고 async
와 await
를 사용하는 TAP을 중점적으로 살펴보겠다.
CompletedSynchronously
속성을 사용해 처리 위치를 (콜백/호출자 중에서) 정할 수 있지만 복잡성 증가SynchronizationContext
를 도입하여 비동기 완료 시 특정 스레드에서 후속 작업을 실행하Task를 기반으로 비동기 작업 상태와 결과를 처리하는 패턴이다.
이전 방식들과 비교하면 다음의 장점이 있다.
비동기 작업의 진행 상황과 후속작업, 결과를 캡슐화하는 객체
_completed
: 완료 여부_error
: 예외 정보TResult _result
: 작업의 결과_continuation
: 작업 완료 후 실행할 후속 동작_ec
: ExecutionContextTask는 소비 목적으로 사용하고 TaskCompletionSource가 Task의 생성과 완료를 제어한다. 이 둘을 분리해서 Task를 외부로 전달할 때 작업을 실수 혹은 의도적으로 완료 상태로 변경하지 못하게 한다.
async
와 await
를 사용하면 다음 함수들이 묵시적으로 호출된다고 생각하면 된다.
public void ContinueWith(Action<MyTask> action)
{
lock (this)
{
if (_completed)
{
// 이미 완료된 경우 즉시 큐에 입력
ThreadPool.QueueUserWorkItem(_ => action(this));
}
else if (_continuation is not null)
{
throw new InvalidOperationException("실제 Task랑 다르게 하나의 후속작업만 등록될 수 있음을 가정");
}
else
{
_continuation = action;
_ec = ExecutionContext.Capture(); // 현재 ExecutionContext 캡처
}
}
}
SetResult
또는 SetException
메서드를 사용하여 완료 상태로 변경할 수 있다.public void SetResult() => Complete(null);
public void SetException(Exception error) => Complete(error);
private void Complete(Exception? error)
{
lock (this)
{
if (_completed)
throw new InvalidOperationException("Already completed");
_error = error;
_completed = true;
if (_continuation is not null)
{
ThreadPool.QueueUserWorkItem(_ =>
{
if (_ec is not null)
{
ExecutionContext.Run(_ec, _ => _continuation(this), null);
}
else
{
_continuation(this);
}
});
}
}
}
Complete
메서드는 작업을 완료 상태로 변경하고 continuation
을 실행변환된 상태머신 코드에서 awaiter가 GetResult를 호출하는데 이때 _task.Wait()이 호출되지만 작업이 완료된 경우에 MoveNext가 호출이 되기 때문에 Blocking되는 일은 없다.
ExecutionContext는 비동기 작업 간에 “보안 정보, 문화권, 호출 정보” 등 주변(ambient) 데이터를 일관되게 유지하기 위한 컨테이너이다. 비동기 작업이 여러 스레드를 오갈 때는 TLS만으로는 부족하므로, ExecutionContext를 활용해 데이터를 흐르게(Flow) 한다.
- 암묵적 데이터 전달
정적 필드를 수정하고, 호출된 메서드에서 이 값을 사용해 데이터를 전달하는 방식- Thread Local Data
[ThreadStatic] 속성을 가진 정적 필드나 ThreadLocal<T>를 이용하면 스레드별로 데이터를 분리
var number = new AsyncLocal<int>();
number.Value = 1;
ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(number.Value)); // 작업 수행 시 캡처된 1 출력
number.Value = 2;
Console.WriteLine(number.Value)); // 2 출력
등록된 비동기 작업은 처리될 때 ExecutionContext이 존재한다면 복원을 한다. 그 이유는 스레드에서 실행했던 다른 작업의 ambient 데이터에 접근하는 것을 막기 위함이다. (각 비동기 작업이 독립적인 환경에서 수행될 수 있다.)
// 스레드 풀의 워커
if (ec is null)
{
action();
}
else
{
ExecutionContext.Run(ec, s => ((Action)s!)(), action);
}
Iterator는 yield return이 있는 부분마다 다른 상태를 저장하는 코드로 변환된다. MoveNext() 메서드에서 각 상태에 대해 switch 문으로 다른 동작을 수행한다.
마찬가지로 비동기 메서드도 await을 만날 때마다 상태를 저장하고 후속 작업을 구분한다.
async
함수는 상태머신 구조체로 변환된다. 본래의 시그니처는 유지되고 본문은 MoveNext()
메서드로 재작성된다.
[AsyncStateMachine(typeof(CopyStreamToStreamAsyncStateMachine))]
public Task CopyStreamToStreamAsync(Stream sourceStream, Stream destinationStream)
{
// 상태 머신 인스턴스 초기화
CopyStreamToStreamAsyncStateMachine stateMachine = default;
stateMachine.methodBuilder = AsyncTaskMethodBuilder.Create();
stateMachine.source = sourceStream;
stateMachine.destination = destinationStream;
stateMachine.state = -1;
stateMachine.methodBuilder.Start(ref stateMachine);
return stateMachine.methodBuilder.Task;
}
private struct CopyStreamToStreamAsyncStateMachine : IAsyncStateMachine
{
public int state;
public AsyncTaskMethodBuilder methodBuilder;
public Stream source;
public Stream destination;
private byte[] buffer;
private TaskAwaiter writeAwaiter;
private TaskAwaiter<int> readAwaiter;
// MoveNext 메서드 등 나머지 구현부
}
Task의 3가지 상태
- success
- failure
- canceled
failure와 canceled 모두 소비자 코드에서 동일하게 exception을 처리하면 된다.
아래의 변환된 본문인 MoveNext를 보면 await
으로 중단되는 지점에서 state를 변경하고 AwaitUnsafeOnCompleted를 호출한다.
state
를 통해서 어떤 위치(await)인지 파악하고, awaiter를 통해 비동기 작업의 완료 확인awaiter: 비동기 작업이 완료될 때 continuation이 실행되게 하는 역할을 가지며 GetResult()로 작업의 결과를 가져옴
private void MoveNext()
{
try
{
int currentState = this.state; // 현재 상태를 저장하는 변수
TaskAwaiter<int> readTaskAwaiter;
if (currentState != 0)
{
if (currentState != 1)
{
// 버퍼 초기화
this.buffer = new byte[4096];
goto PerformReadAsync;
}
// 상태가 1인 경우: 이전에 대기 중이었던 읽기 작업(awaiter)을 복원
readTaskAwaiter = this.readAwaiter;
this.readAwaiter = default(TaskAwaiter<int>);
currentState = (this.state = -1); // 상태를 업데이트
goto AfterRead;
}
// 상태가 0인 경우: 이전에 대기 중이었던 쓰기 작업(awaiter)을 복원
TaskAwaiter writeTaskAwaiter = this.writeAwaiter;
this.writeAwaiter = default(TaskAwaiter);
currentState = (this.state = -1); // 상태를 업데이트
// 이전 쓰기 작업의 결과를 처리
writeTaskAwaiter.GetResult();
PerformReadAsync:
// 스트림에서 비동기로 읽기 작업 수행
readTaskAwaiter = source.ReadAsync(this.buffer, 0, this.buffer.Length).GetAwaiter();
if (!readTaskAwaiter.IsCompleted)
{
// 작업이 완료되지 않은 경우 상태를 1로 업데이트하고 대기를 설정
currentState = (this.state = 1);
this.readAwaiter = readTaskAwaiter;
this.methodBuilder.AwaitUnsafeOnCompleted(ref readTaskAwaiter, ref this);
return;
}
AfterRead:
// 읽기 작업이 완료된 경우 결과 처리
int bytesRead;
if ((bytesRead = readTaskAwaiter.GetResult()) != 0)
{
// 읽은 데이터를 대상에 비동기로 쓰기
TaskAwaiter writeAwaiter = destination.WriteAsync(this.buffer, 0, bytesRead).GetAwaiter();
if (!writeAwaiter.IsCompleted)
{
// 작업이 완료되지 않은 경우 상태를 0으로 업데이트하고 대기를 설정
currentState = (this.state = 0);
this.writeAwaiter = writeAwaiter;
this.methodBuilder.AwaitUnsafeOnCompleted(ref writeAwaiter, ref this);
return;
}
// 쓰기 작업이 완료된 경우 결과 처리
writeAwaiter.GetResult();
goto PerformReadAsync;
}
}
catch (Exception exception)
{
// 예외 발생 시 상태를 -2로 설정하고 Task를 실패 상태로 설정
this.state = -2;
this.buffer = null;
this.methodBuilder.SetException(exception);
return;
}
// 메서드가 성공적으로 완료된 경우
this.state = -2;
this.buffer = null;
this.methodBuilder.SetResult();
}
상태 머신의 힙 복사
- 상태 머신은 처음에는 스택에 존재
- 첫 await에서 중단될 때 힙으로 박싱
- continuation이 해당 힙에 있는 상태 머신의 MoveNext 호출
메모리를 절약하기 위해 awaiter와 임시 변수를 줄여서 상태 머신의 필드 수를 줄일 수 있음
ExecutionContext 캡처
비동기 메서드 중단 시점에 ExecutionContext.Capture()를 호출하여 현재 컨텍스트를 저장
MoveNextRunner 객체 생성
캡처한 ExecutionContext와 박싱된 상태 머신을 감싸는 MoveNextRunner 객체를 할당
Action 대리자 생성
MoveNextRunner.Run 메서드를 호출하는 Action 대리자를 생성하여 이후 continuation 실행 시 Action을 통해 상태 머신을 재개
상태 머신 박싱 및 빌더 연결
상태 머신 박싱 후, SetStateMachine 호출을 통해 빌더와 박싱된 상태 머신을 연결
Continuation 등록
할당 비용
복잡한 계층(ExecutionContext, MoveNextRunner, Action 등)을 매번 생성하므로 Task를 자주 중단할 경우 대량의 객체 할당 발생
예: 1천 번 Task.Yield() 호출 시 수백만 개 단위의 할당 발생 가능
awaiter 최적화 경로
비동기 인프라에 속한 awaiter는 Action, QueueUserWorkItemCallback 생성 없이 IAsyncStateMachineBox를 직접:
할당 비용
동일한 예에서 할당이 약 1000개 수준으로 줄어듦
- 메서드가 중단되지 않으면 할당 없음
- 중단되더라도 단 한 번의 박싱(또는 AsyncStateMachineBox 생성)으로 상태 관리
추가 최적화: .NET 6 및 C# 10
메서드 단위로 빌더를 재정의하여 최적화된 빌더(PoolingAsyncValueTaskMethodBuilder 등)를 사용할 수 있습니다.
이를 통해 극단적으로는 비동기 메서드 실행 중에도 추가 할당 없이 처리 가능
ValueTask 및 IValueTaskSource
ValueTask를 사용하면, 동기 완료 시 Task 할당을 피하거나, IValueTaskSource를 통해 객체 풀링을 활용하여 빈번한 비동기 호출에서도 최소한의 할당으로 높은 성능을 달성할 수 있습니다.
동기적으로 결과를 반환하는 경우 Task
객체를 매번 생성하는 것은 비효율적이다.
예를 들어
MemoryStream
의 경우 동일한 크기의 읽기 작업을 반복할 때, 이전에 성공적으로 완료된Task<int>
결과를 재사용하는 식으로 최적화할 수 있다.
PoolingAsyncValueTaskMethodBuilder로 상태기계를 풀링하여 성능을 높일 수 있다.
System.Threading.Tasks.Sources.IValueTaskSource<TResult>
public interface IValueTaskSource<out TResult>
{
ValueTaskSourceStatus GetStatus(short token);
// continuation 연결
void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags);
// 작업 결과 확인
TResult GetResult(short token);
}
ValueTask<TResult>
를 위한 사용자 정의 소스public readonly struct ValueTask<TResult>
{
// private readonly Task<TResult>? _task;
// Task<TResult> or null만 가능했으나 이제 IValueTaskSource<TResult> 가능
private readonly object? _obj;
private readonly TResult _result;
...
}
ExecutionContext와 동일하게 캡처한 뒤 후속 작업이 재개될 때 동일한 컨텍스트에서 실행
// SynchronizationContext가 없으면 TaskScheduler가 결정
object scheduler = SynchronizationContext.Current;
if (scheduler is null && TaskScheduler.Current != TaskScheduler.Default)
{
scheduler = TaskScheduler.Current;
}
SynchronizationContext가 없는 경우 TaskScheduler를 통해 실행 위치를 결정한다. 기본 스케줄러는 스레드 풀을 사용한다.
TaskScheduler는 Task의 실행 시간, 위치 제어
ConfigureAwait(false)
라이브러리나 외부 코드가 내부적으로 커스텀 컨텍스트를 사용하면 생략할 수 없음
그렇지 않다면 생략해도 되기는 함
ExecutionContext와 SynchronizationContex의 차이
- ExecutionContext: 비동기 경계를 지나며 ambient data를 지속적으로 유지. 어떤 스레드에서 실행하든 일관적인 정보 유지
- SynchronizationContext: 비동기 작업 재개 시 어떤 스레드(UI 스레드 등)에서 실행할 지 결정
다음 내용을 쉽게 정리한 것
안녕하세요
10월에 전북대에서 강사님 강연을 들었던 전북대 2학년 학생입니다. 강사님이 활동하고 계시는 게임 서버 개발자에 대해서 여쭤보고 싶은게 있는데 혹시 메일 주소 알려주실 수 있을까요?