
인터넷을 돌아다니며 이곳저곳 다 방문하였으나
어찌된 일인지 C#만 Rx 문서가 한국어로 된 게 없다니 이게 무슨...어!
그래서 https://introtorx.com 에서 필자가 직접 머리를 박아 공부한 내용들을
한국어로 정리해보려고 합니다.
근데 귀찮다
(MD 상에서 꺽쇠괄호 적기가 어려워 지네릭은 이하 생략. 필요할 때만 넣겠습니다)
IObservable 과 IObserver 는 아주 기본적으로 스트림을 통해 대화하는 패턴을 위한
기본적인 규격을 정의해놓고 있습니다.
namespace System
{
/// <summary>Defines a provider for push-based notification.</summary>
/// <typeparam name="T">The object that provides notification information.</typeparam>
public interface IObservable<out T>
{
/// <summary>Notifies the provider that an observer is to receive notifications.</summary>
/// <param name="observer">The object that is to receive notifications.</param>
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
IDisposable Subscribe(IObserver<T> observer);
}
}
namespace System.Reactive
{
/// <summary>
/// Provides a mechanism for receiving push-based notifications and returning a response.
/// </summary>
/// <typeparam name="TValue">
/// The type of the elements received by the observer.
/// This type parameter is contravariant. That is, you can use either the type you specified or any type that is less derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
/// </typeparam>
/// <typeparam name="TResult">
/// The type of the result returned from the observer's notification handlers.
/// This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
/// </typeparam>
public interface IObserver<in TValue, out TResult>
{
/// <summary>
/// Notifies the observer of a new element in the sequence.
/// </summary>
/// <param name="value">The new element in the sequence.</param>
/// <returns>Result returned upon observation of a new element.</returns>
TResult OnNext(TValue value);
/// <summary>
/// Notifies the observer that an exception has occurred.
/// </summary>
/// <param name="exception">The exception that occurred.</param>
/// <returns>Result returned upon observation of an error.</returns>
TResult OnError(Exception exception);
/// <summary>
/// Notifies the observer of the end of the sequence.
/// </summary>
/// <returns>Result returned upon observation of the sequence completion.</returns>
TResult OnCompleted();
}
}
IObserver는
System의 것과System.Reactive의 것이 다르니 주의하시기 바랍니다.
System것은 TResult 지네릭 타입이 없는 형태입니다. (Observable이 리턴을 받지 아니함을 의미)
Observer가 Observable를 구독하는 주체, Observable은 피구독자 입니다.
(옵저버 패턴은 어느정도 아시겠죠..?)
OnNext 를 통해 구독자에게 값을 보내고
OnError 를 통해 예외가 발생함을 알리고
OnComplete 로 시퀀스의 종료를 알립니다.
OnError 또는 OnComplete 가 호출되면 더 이상 이벤트를 제공하지 않을것임을 알려주는 것입니다.
이후에 위 3개 메서드 중 어떠한 것도 호출되지 않음을 의미합니다.
Observer 클래스에서 IObservable의 Subscribe 를 호출해주기만 하면 됩니다.
IDisposable disposable = source.Subscribe(this);
당연히 실행 주체는 IObserver<T> 를 상속받는 클래스이고
source, 즉 IObservable<T> 상속자는 Subscribe 를 알맞게 구현해놓아야겠죠?
IDispoable 은 뒤에서 설명하겠습니다.
예시로, 10분마다 5명 모여있는 단톡방에서 누가 대화를 얼마 했는지 구독자에게 전달한다 할 때
를 OnNext 에 담아 호출해주는 것입니다. 물론 10분마다요.
만약 금일 18시가 되면 정보제공을 중단하겠다? 그럼 18시에 OnComlete 호출하면 됩니다.
그리고 만약 단톡방이 폭파됐다거나 하면 OnError를 호출해서 더 이상 정보를 제공하지 않을 것임을 Observer에게 알려줍니다.
군대 훈련소 생각이 나는데 :
Observer : 보고! (실제로 Observer가 보고 요청을 하진 않습니다. 상황극이라..)
Observable : 충! 성!(충! 성!)N중대, 아침점호 인원보고
- 총원 100명
- 열외 5명
- 현재원 95명
- 열외내용 확진분리 5명
- 이상 아침점호 끝! (=OnComplete = 더 이상 보고할 내용이 없어요)
(제발 아침점호땐 준비끝 하지마! 저녁점호때만 하라고)
기본적으로 내용들이 다 Observable 이 지켜야 할 내용들인데, 이는 Observer들이 그러한 규칙을 신경쓰지 않고 이벤트 처리에만 전념할 수 있게끔 하기 위함입니다.
피구독자는 한 명인데, 구독자는 여러 명이니 당연한 소리라고 볼 수도 있죠.
하나가 처리하면 되는거 대충해놓고 불특정 다수에게 뿌리면 똥이 *N 배 되는거나 다름없으니까요.
OnError 또는 OnComplete 이후에 다른 호출이 이뤄져서는 안 된다.
예를 들어 OnComplete 를 호출한 지점 이후에는 OnNext, OnError, OnComplete 그 어떤 것도 나와서는 안 된다.
OnNext의 호출은 IObservable 단에서 thread-safe 해야 한다.
즉 다음과 같은 호출은 했다가는 큰일난다 :
public static void EverythingEverywhereAllAtOnce(IEnumerable<int> obs)
{
Random r = new();
for (int i = 0; i < 10000; ++i)
{
int v = r.Next();
Task.Run(() => obs.OnNext(v)); // Against the rules!
}}
... 생략 ...
이를 방지하고자 Object sync 와 lock(sync) { 를 공식 문서에서 사용합니다.
OnNext 블럭에서 Observable 의 한 메서드를 호출하는데, 만약 그것이 다시 OnNext 또는 OnComplete 또는 OnError 를 호출하게 되는 경우 실행 순서가 뒤죽박죽이 되어버리거나 무한루프에 빠질 수 있음을 말합니다.문서에서는 이 규칙들을 다음과 같은 정규식으로 표현하고 있습니다 :
(OnNext)*(OnError|OnComplete)?
즉 동일한 Observer에 대해 이 3가지 메서드는 이전 호출이 끝난 이후에 다시 호출되어야 하며
마지막에는 OnError 또는 OnComplete 하나만 와야 합니다.
단, 이들이 호출되지 않아도 되는데 이는 프로그램이 종료될 때까지 구독을 종료할 필요가 없는 경우를 의미합니다.
즉 프로그램 실행 상태에서 계속 열려있는 이벤트 스트림이라 생각하시면 됩니다.
구독의 취소는 구독 시 반환받은 IDisposable 객체를 통해 실현할 수 있습니다.
disposable.Dispose();
이 호출이 정상적으로 종료되었다면, 이제 이 명령을 실행한 Observer 는 더이상 IObserver 의 3가지 메서드 중 어느것도 호출받지 아니할 것임을 확실시하게 됩니다.
물론 IObservable 단에서 그렇게 해주어야 합니다. (또 하나의 규칙)
생각보다 간단명료한데, 애매한 부분(원문: gray area)이 있습니다
만약 호출을 했는데, 리턴이 안 된다면?
Dispose 는 리턴타입이 void 라 오는 건 없지만, 아무튼 리턴이 안 됐다는 것은
Observable 이 dispose 수행을 끝내지 못했다는 것을 의미하겠죠.
멀티스레딩 환경에서는 당연하게도 Dispose 호출을 처리하는 동안 이벤트가 발생하여 그것이 OnNext 등을 타고 들어오는 것이 가능하고, 어떤 작업을 또 수행하겠죠.
즉 이렇게 이후 호출되는 OnNext 에 구애받지 않고 구독을 취소하는 상황은 OnNext 내에서 Dispose 를 호출하는 것밖에 없을 것이라고 문서는 얘기합니다.
우리가 앞서 OnNext의 호출은 thread-safe 하게 만들기로 약속했기 때문에, 이 경우에는 당연히 다음 호출이 들어오기 전에 Dispose 가 완료되겠죠.
하지만 만약 OnNext 콜이 아직 들어오지 않은 상태라고 가정한다면, 다음 세 가지 중 뭐든 가능한(원문: legal) 방법입니다 :
Dispose 가 시작되자마자 거의 즉시 중단합니다. 중단하는 작업이 오래 걸린다 하더라도 말이죠. 이 경우 Observer 는 OnError 또는 OnComplete 를 호출받지 않게 됩니다.OnComplete 를 호출하고 아닐 시 OnError 를 호출해주는 것 등을 포함하여서 말이죠.Dispose 호출 이후 OnNext 가 몇번 더 호출될 수 있으나, 임의의 시점에 그냥 종료해버립니다. 이 경우 종료 과정에서 발생한 오류 같은 것들을 놓칠 수 있습니다.Rx 에서는 첫 번째 방법을 선호한다고 하고, 기본적으로 저렇게 동작할 것이라고 가정합니다.
Observer 가 구현한 알림 콜백 내에서 Observable 에 작업을 하는 까다로운 상황을 부분적으로 피하기 위해서입니다.
메서드 재진입을 처리하는 게 어렵기 때문에 Rx 에서는 구독 종료를 시작하기 이전에 일단 알림부터 보내지 않을 것임을 확실하게 하여 이러한 종류의 재진입을 처리할 필요가 없도록 합니다.
요거는 원문에서도 사람들 혼 빠지게 하는 부분이라고 하는데요,
결국 원문에 써져있는 요약도 어쨌든 구독을 중도 취소하면 스레드를 중간에 정지시키는거랑 똑같이 더러울 수밖에 없다 입니다.
Dispose 를 호출했다 하더라도 그것이 리턴되기 이전까지는 이후에 일정 기간 알림이 더 올 수 있습니다.
그러나 더 올 수 있다는 것이므로 그것에 의존할 수 없습니다.
그런 의미에서 우리는 상기한 3가지 방법 중 Rx 를 이용한 시스템이 쓴 방법이 뭐가 됐든 일단 1번처럼 처리할거라는 가정을 해야하고, 대부분의 Rx 구현 메서드들이 그럴 것이라고 원문은 얘기하고 있습니다.
즉 우리도 이에 맞추어 Observer 는 Dispose 를 호출한 이후에 오는 알림들은 무시토록 하고
Observable 은 Dispose 가 들어오자마자 이미 간 알림은 어쩔 수 없고, 최소한 그 다음은 가지 않도록 일단 바로 해당 Ovserver 에 대한 알림중단을 수행해야겠습니다.
물론 상황에 따라 뒤에 늘어지는 알림을 써야 하는 경우도 있을 수 있을 것 같습니다.
수도꼭지와 긴 호스에 비유하자면, 수도꼭지 잠근다고 바로 물 나오던 게 멈춰버리지는 않는 것과 비슷하달까요?
그런 느낌의 무엇을 구현할 일이 있을런지 잘 모르겠으나, 있을 수도 있다는 점은 짚어두면 좋겠습니다.
너무 좋은 글 잘 보고갑니다.^^