Rx.NET 한국어 설명서 2편

ForestHouse·2024년 4월 12일

Rx.NET

목록 보기
2/2
post-thumbnail

IObservable<T> 구현

이번 챕터는 일단 이 인터페이스를 직접 구현하는 것으로 시작합니다.
그런데 원문에서는 이 방식이 비교적 일반적이지 않은 방법이라고 합니다?
System.Reactive 를 사용하는 여러 방법들을 차근차근 알아가보도록 합시다.

기초적인 구현

public class MySequenceOfNumbers : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        observer.OnNext(1);
        observer.OnNext(2);
        observer.OnNext(3);
        observer.OnCompleted();
        return System.Reactive.Disposables.Disposable.Empty;
        // OnCompleted 호출되어 사실상 아무것도 못하는 IDisposable 입니다..
    }
}

아주 간단하죠? 구현할 메서드는 Subscribe 하나뿐입니다.
물론 지금은 Cold Observable 형태로 구현이 되어서 그런것이긴 합니다만.

  • Cold Observable : 구독자가 생겨나면 갑자기 일처리를 시작, HTTP 요청 등
  • Hot Observable : 구독자가 있든 없든 이벤트가 계속 생겨남, 서버 신호 수신이나 키보드 마우스 입력 등

테스트를 간단하게 해봅시다 :

var numbers = new MySequenceOfNumbers();
numbers.Subscribe(
    number => Console.WriteLine($"Received value: {number}"),
    () => Console.WriteLine("Sequence terminated"));
출력
Received value 1
Received value 2
Received value 3
Sequence terminated

근데 이 예제는 너무 간단하고, 사실상 "반응형" 이지는 않습니다.
심지어 세 번의 OnNext 호출이 끝나기 전까지는 Subscribe 호출이 리턴되지도 않습니다.
당연히 예제라 그런 것이긴 한데, 만약 이 예제처럼 단순히 값들을 순차적으로 받고 싶다면
IEnumerable<T> 을 사용하는 것이 맞습니다. (List<T> 같은 애들 말이죠)

파일시스템을 이용하는 예제

이번에는 조금 더 현실적인, 그래도 좀 더 개연성이 있는 예제를 (원문이)적어놓았습니다.
여기있는 예제 다 원문에 있는 예제인거 아시죠?

.NET 에 있는 FileSystemWatcher(이하 fsw) 라는데, 전 써본 적이 없지만 파일의 변경을 감지한다고 합니다.
변경을 감지하면, 기존 C# 방식으로 Event 를 쏴준다고 하는데 이를 우리는 Rx 로 wrapping 을 해줄겁니다.
그래서 Observable 객체는 IObservable<FileSystemEventArgs>(이하 fsea) 가 됩니다.

ℹ️ 원문 Note
그런데 파일명을 바꾸는 경우, fsw 는 세부 정보를 RenamedEventArgs 에 담아준다고 합니다.
fsea 에서 파생된 친구라고 하고, 이 경우 Observable 을 상기한 대로 구현하면 파일명 변경 이벤트의 세부적인 정보를 알고싶은 경우에는 불편할 수 있다고 합니다.
더욱 중대한 구조적인 문제는 fsw.Error 에서 하나 이상의 이벤트를 보고하는 데에는 적합하지 않다는 것입니다.

이 에러들이 일시적이고 해결이 가능할 수도 있기에 프로그램이 그냥 계속해서 작업을 하고싶을 수 있다만 오직 IObservable<T> 하나만으로 모든 것을 표현하려고 하다보니 OnError 을 호출해 에러를 알리고, 이는 모든 구독이 중단되게 해버리죠.
Rx 에 있는 Retry 연산자를 써서 에러 이후 재구독을 시키는 방법도 있긴 한데, 원문에서는 이것보다는 구독을 아예 중지시키지 않는 방향으로 IObservable<ErrorEventArgs> 를 하나 더 만들어 제공하는 편이 낫다고 합니다.
그런데 또 이렇게 구독지를 하나 더 추가하는 것이 항상 가능한 것은 아니라고 합니다. (어쩌라는겨😵‍💫)

원문은 즉 IObservable<fsea> 하나만 쓰는 간단한 경우가 적합한 프로그램이 어딘가에 있을 것이고, 어디는 추가적으로 더 구독지를 달아줘야 하는 경우도 있을것이라며 프로그램 작성에 있어 획일적인 방법은 없다 라고 하며 말을 마쳤습니다.
그래서 저희는 배우는 입장이니 그냥 여러가지 생각할만한 거리들을 던져준 것 같고,
아마 알잘딱깔센 하시라는 얘기로 보입니다.

// Represents filesystem changes as an Rx observable sequence.
// NOTE: this is an oversimplified example for illustration purposes.
//       It does not handle multiple subscribers efficiently, it does not
//       use IScheduler, and it stops immediately after the first error.
public class RxFsEvents : IObservable<FileSystemEventArgs>
{
    private readonly string folder;

    public RxFsEvents(string folder)
    {
        this.folder = folder;
    }

    public IDisposable Subscribe(IObserver<FileSystemEventArgs> observer)
    {
        // Inefficient if we get multiple subscribers.
        FileSystemWatcher watcher = new(this.folder);

        // FileSystemWatcher's documentation says nothing about which thread
        // it raises events on (unless you use its SynchronizationObject,
        // which integrates well with Windows Forms, but is inconvenient for
        // us to use here) nor does it promise to wait until we've
        // finished handling one event before it delivers the next. The Mac,
        // Windows, and Linux implementations are all significantly different,
        // so it would be unwise to rely on anything not guaranteed by the
        // documentation. (As it happens, the Win32 implementation on .NET 7
        // does appear to wait until each event handler returns before
        // delivering the next event, so we probably would get way with
        // ignoring this issue. For now. On Windows. And actually the Linux
        // implementation dedicates a single thread to this job, but there's
        // a comment in the source code saying that this should probably
        // change - another reason to rely only on documented behaviour.)
        // So it's our problem to ensure we obey the rules of IObserver<T>.
        // First, we need to make sure that we only make one call at a time
        // into the observer. A more realistic example would use an Rx
        // IScheduler, but since we've not explained what those are yet,
        // we're just going to use lock with this object.
        object sync = new();

        // More subtly, the FileSystemWatcher documentation doesn't make it
        // clear whether we might continue to get a few more change events
        // after it has reported an error. Since there are no promises about
        // threads, it's possible that race conditions exist that would lead to
        // us trying to handle an event from a FileSystemWatcher after it has
        // reported an error. So we need to remember if we've already called
        // OnError to make sure we don't break the IObserver<T> rules in that
        // case.
        bool onErrorAlreadyCalled = false;

        void SendToObserver(object _, FileSystemEventArgs e)
        {
            lock (sync)
            {
                if (!onErrorAlreadyCalled)
                {
                    observer.OnNext(e); 
                }
            }
        }

        watcher.Created += SendToObserver;
        watcher.Changed += SendToObserver;
        watcher.Renamed += SendToObserver;
        watcher.Deleted += SendToObserver;

        watcher.Error += (_, e) =>
        {
            lock (sync)
            {
                // The FileSystemWatcher might report multiple errors, but
                // we're only allowed to report one to IObservable<T>.
                if (!onErrorAlreadyCalled)
                {
                    observer.OnError(e.GetException());
                    onErrorAlreadyCalled = true; 
                    watcher.Dispose();
                }
            }
        };

        watcher.EnableRaisingEvents = true;

        return watcher;
    }
}

주석이 너무 길어서 지울까 했는데, 어차피 스크롤 까딱 하면 되는 것이고 오히려 주석 보려고 원문 가서 코드 찾는 게 더 귀찮으실 것 같아 그냥 놔두게 되었습니다.

아무튼, 아무래도 이전보다는 조금 더 복잡해진 코드입니다. (주석때문 아닌가?)
이 코드가 IObservable 을 구현할 때 IObserver 의 규칙을 준수할 책임이 있다는 것을 보여준다고 합니다.
무슨 말인지 감이 안 오실 텐데, Observer 각각에서 규칙을 준수하기 보다 Observable 단에서 한번 규칙을 지켜주면 이를 구독하는 Observer 쪽 코드에서는 딱히 그 규칙들을 신경쓰지 않아도 프로그램이 잘 굴러갈 수 있다는 말입니다.
아마 저번 챕터에서 소개를 드렸던 것 같은데, Observable 단에서 여러 스레드의 동시 접근을 차단해주고 있죠?
그러면 Observer 는 동시접근을 고려하지 않아도 괜찮다는 소리입니다 (Observer 기준으로는 동시에 알림이 여러 개 오는 것이겠죠?).
제 예시에서 규칙을 지켰다는 것의 의미는 "Observer 규칙 : Observer 는 thread-safe 이어야 한다" 는 것이고, 이를 Observable 에서 구현하였다고 볼 수 있겠습니다.
그리고 Observer 들은 이 규칙에 의존할 수 있다는 것이죠.

만약 Observable 단에 이러한 작업을 안 했다면, Observable 은 간단해질지 몰라도 수많은 Observer 들은 더러워질 것입니다.
기본적으로 피구독자보다 구독자가 더 많으니까요.
아무튼 이러한 Rx 의 규칙은 멀티스레딩에 의해 잘못된 정보나 접근이 퍼지는 것을 더욱 쉽게 해결해준다고 합니다.

ℹ️ 원문 Note :
아무튼 이건 Rx에서 슈퍼 굉장 킹왕짱 겁나 중요한 부분임.
특히 이벤트 제공자와 이벤트 프로세스가 더욱 많아지고 복잡해질 때마다 계속해서 중요해짐.

위 코드에 API 디자인 문제 말고 다른 문제가 있습니다.
하나는 이벤트 소스가 현실 세계의 비동기적 활동(파일 변경과 같은)으로 이벤트를 만들 때, 종종 특정 스레드에 이벤트 알림을 보내고 싶을 수도 있다는 것입니다.
UI 프레임워크는 스레드와 밀접한 관련이 있는 요구사항을 가지는 경향이 있습니다.
전형적인 많은 프로그램들, 특히 안드로이드 앱 같은 경우에 UI 관련 조작은 오직 UI 스레드만이 접근 가능합니다.
Rx 는 다른 스케쥴러에게 알림을 리디렉션하는 기제를 제공하므로 이 문제를 해결할 수 있기는 합니다만, 일반적으로 이러한 종류의 Observer 에게 IScheduler 를 주고 알림을 넘길 수 있게 해준다고 합니다.
IScheduler 는 뒤 챕터에서 알려준다고 하네요.

또 다른 문제는 이 코드가 여러 구독자들을 효율적으로 처리하지 못한다는 것인데요
코드를 자세히 보시면 구독자에 의해 Subscribe 가 호출될 때마다 fsw 가 매번 생성됩니다.
이러면 뭐.. 무슨 1인 1닭도 아니고 옵저버 패턴인 것이 무색해지는 것이죠.

생각보다 이 문제가 일어나기가 쉽다고 하는데요, 다음 코드를 보시죠 :

IObservable<FileSystemEventArgs> configChanges =
    fs.Where(e => Path.GetExtension(e.Name) == ".config");
IObservable<FileSystemEventArgs> deletions =
    fs.Where(e => e.ChangeType == WatcherChangeTypes.Deleted);

Where 을 통해 반환되는 IObservable 의 Subscribe 를 호출하게 되면, 입력에 대해 Subscribe 를 호출합니다.
(말이야 방구야)
그래서 configChanges 하고 deletions 에 대해 Subscribe 를 호출하면 둘 다 fs 에 대해서 호출됩니다.
그러므로 만약 fs 가 우리가 위에 적은 예제 RxFsEvents 와 같은 형태라면, fsew 를 두 번 만드는 일이 생기겠죠.

Rx 는 이러한 문제를 해결하기 위한 몇가지 방법을 제공하는데요,
여러 구독자를 허용하지 않는 IObservable 을 취하도록 하는 연산자를 제공하고 다음과 같이 어댑터 안에 감쌉니다 :

IObservable<FileSystemEventArgs> fs =
    new RxFsEvents(@"c:\temp")
    .Publish()
    .RefCount();

참고로 Publish 는 좀 뒷쪽에 나오는 연산자라고 합니다. (저도 아직 모릅니다)
만약 다수의 구독자를 가지는 것에 더 친숙한 타입을 만들기를 원하신다면, 모든 구독자들을 추적하고 반복문을 통해 구독자 각각에게 알리는 것입니다.
그 방식으로 짠 코드를 한번 보시죠 :

public class RxFsEventsMultiSubscriber : IObservable<FileSystemEventArgs>
{
    private readonly object sync = new();
    private readonly List<Subscription> subscribers = new();
    private readonly FileSystemWatcher watcher;

    public RxFsEventsMultiSubscriber(string folder)
    {
        this.watcher = new FileSystemWatcher(folder);

        watcher.Created += SendEventToObservers;
        watcher.Changed += SendEventToObservers;
        watcher.Renamed += SendEventToObservers;
        watcher.Deleted += SendEventToObservers;

        watcher.Error += SendErrorToObservers;
    }

    public IDisposable Subscribe(IObserver<FileSystemEventArgs> observer)
    {
        Subscription sub = new(this, observer);
        lock (this.sync)
        {
            this.subscribers.Add(sub); 

            if (this.subscribers.Count == 1)
            {
                // We had no subscribers before, but now we've got one so we need
                // to start up the FileSystemWatcher.
                watcher.EnableRaisingEvents = true;
            }
        }

        return sub;
    }

    private void Unsubscribe(Subscription sub)
    {
        lock (this.sync)
        {
            this.subscribers.Remove(sub);

            if (this.subscribers.Count == 0)
            {
                watcher.EnableRaisingEvents = false;
            }
        }
    }

    void SendEventToObservers(object _, FileSystemEventArgs e)
    {
        lock (this.sync)
        {
            foreach (var subscription in this.subscribers)
            {
                subscription.Observer.OnNext(e);
            }
        }
    }

    void SendErrorToObservers(object _, ErrorEventArgs e)
    {
        Exception x = e.GetException();
        lock (this.sync)
        {
            foreach (var subscription in this.subscribers)
            {
                subscription.Observer.OnError(x);
            }

            this.subscribers.Clear();
        }
    }

    private class Subscription : IDisposable
    {
        private RxFsEventsMultiSubscriber? parent;

        public Subscription(
            RxFsEventsMultiSubscriber rxFsEventsMultiSubscriber,
            IObserver<FileSystemEventArgs> observer)
        {
            this.parent = rxFsEventsMultiSubscriber;
            this.Observer = observer;
        }
        
        public IObserver<FileSystemEventArgs> Observer { get; }

        public void Dispose()
        {
            this.parent?.Unsubscribe(this);
            this.parent = null;
        }
    }
}

보시면 당연히 thread-safe 조치가 기본적으로 다 되어있는 모습이고, 하나의 fsw 인스턴스를 생성하는 것을 볼 수 있습니다.
구독 횟수에 상관 없이 말이죠.

이번 챕터의 첫 예제에서는 Subscribe 단계에서 모든 것들이 끝났기 때문에 IDisposable 을 쓸 일이 없었고
RxFsEvent 에서는 각각의 구독자가 각각의 fsw 를 가졌기 때문에 그 fsw 의 Dispose 메서드를 그냥 넘겨주면 됐습니다.
하지만 이제 모든 구독자가 하나의 fsw 를 공유하므로 그런 방식은 당연히 사용하면 안 되겠죠.
그래서 Subscription 이라는 내부 클래스를 하나 만든 것입니다.
이 친구는 Dispose 가 호출될 때 구독자 목록에서 자신을 삭제하여 더이상 알림을 받지 않음을 명확히 합니다.
그리고 만약 구독자가 아무도 없다면 UnsubscribeEnableRaisingEvents 를 false 로 바꾸어 불필요한 이벤트 발생이 일지 않도록 해줍니다.

처음 예시보다는 훨씬 현실적입니다.
Hot Observable 에 대해 다루면서 여러 구독자를 가질 수 있는 예제죠?
우리가 이벤트, 콜백, 알림 등등의 시스템을 생각했을 때 일반적으로 머리속에 그려내는 그 모습 딱 그것이라고 할 수 있겠습니다.

그런데
원문은 종종 이런 방식을 이용하지 않을것이라고 합니다. (끝은 어디인가...😥)
코드를 직접 써보시면, 이 예제는 System.Reactive 를 사용하지 않는다는 것을 알 수 있습니다.
빌트인 인터페이스만 쓰고 있거든요.
실제 상황에서는 일반적으로 다양한 기능들을 갖춘 System.Reactive 의 헬퍼에 의존합니다.
그래서 만약 위 예제에서 Changed 이벤트만 필요한 상태라면 다음과 같이 코드를 짤 수 있겠습니다 :

FileSystemWatcher watcher = new (@"c:\temp");
IObservable<FileSystemEventArgs> changes = Observable
    .FromEventPattern<FileSystemEventArgs>(watcher, nameof(watcher.Changed))
    .Select(ep => ep.EventArgs);
watcher.EnableRaisingEvents = true;

Rx.NET 사용법을 모르는 저도, 일단 보면 대충 해석은 가는 아주 간결한 코드가 아닐 수 없습니다.
Rx 를 C# 이벤트로부터 구현하고 이벤트 종류를 선택하는 것으로 볼 수 있겠네요.
위의 예제처럼 직접 IObservable 을 클래스 단에서 구현한 것보다야 당연히 유연하지는 않고 부족한 점들이 있기는 하지만, 몇몇 프로그램들에서는 이정도로도 충분할겁니다.
그리고 또, 간결하잖아요 😎

모든 기능을 포함하는 fsw 래퍼를 만들고자 했다면 앞에서 보여드린 대로 IObservable 을 구현하는 편이 더 좋습니다.
(그리 어렵지 않게, 마지막 예제를 확장해 모든 이벤트를 볼 수 있도록 만들 수 있습니다. 단순히 FromEventPattern 를 각각의 이벤트에 대해 사용해주고 Observable.Merge 로 네 개의 observable 을 하나로 엮을 수 있습니다. 완전한 커스텀 observable 구현으로부터 얻을 수 있는 유일한 진짜 이점은 fsw 를 현재 옵저버 수에 따라 켰다 껐다 할 수 있다는 것입니다.)
그게 아니고 단순이 IObservable 형태로 어떤 이벤트를 나타내고 싶은 것이라면 이처럼 그냥 간단한 접근법이 좋다~ 입니다.

실제로는 IObservable 쓰려고 System.Reactive 패키지를 import 한다고 하는데요
fsw 의 자동 시작/종료 와 같은 기능을 원할 때에도, 이를 구현할 수 있는 System.Reactive 내의 연산자의 조합을 거의 대부분의 상황에서 찾아낼 수 있다고 합니다. (그걸 왜 이제서 말해주지?)

위의 풀(full)-구현 수제버거 RxFsEventsMultiSubscriber 와 달리 System.Reactive 를 사용하고 더 적은 코드를 사용하는 예제가 아래에 있습니다 :

IObservable<FileSystemEventArgs> ObserveFileSystem(string folder)
{
    return 
        // Observable.Defer enables us to avoid doing any work
        // until we have a subscriber.
        Observable.Defer(() =>
            {
                FileSystemWatcher fsw = new(folder);
                fsw.EnableRaisingEvents = true;

                return Observable.Return(fsw);
            })
        // Once the preceding part emits the FileSystemWatcher
        // (which will happen when someone first subscribes), we
        // want to wrap all the events as IObservable<T>s, for which
        // we'll use a projection. To avoid ending up with an
        // IObservable<IObservable<FileSystemEventArgs>>, we use
        // SelectMany, which effectively flattens it by one level.
        .SelectMany(fsw =>
            Observable.Merge(new[]
                {
                    Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                        h => fsw.Created += h, h => fsw.Created -= h),
                    Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                        h => fsw.Changed += h, h => fsw.Changed -= h),
                    Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
                        h => fsw.Renamed += h, h => fsw.Renamed -= h),
                    Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                        h => fsw.Deleted += h, h => fsw.Deleted -= h)
                })
            // FromEventPattern supplies both the sender and the event
            // args. Extract just the latter.
            .Select(ep => ep.EventArgs)
            // The Finally here ensures the watcher gets shut down once
            // we have no subscribers.
            .Finally(() => fsw.Dispose()))
        // This combination of Publish and RefCount means that multiple
        // subscribers will get to share a single FileSystemWatcher,
        // but that it gets shut down if all subscribers unsubscribe.
        .Publish()
        .RefCount();
}

(저만 더 난해한가요)
잠시 또 코드를 살펴보면 음..
Defer 로 lazy initialization 까지는 구현을 했고
Merge 로 여러 개의 Ovservable 을 하나로 합치는 것으로 보이네요.
그러면 IObservable[] 에서 IObservable 이 될테고
이걸 다시 IObservable 로 감싸는 것을 막기 위해 SelectMany 를 쓴다는 것 같네요.

"이게 번역이냐!" 라고 생각하실 분들을 위해 다시 말씀드리지만 이사람 또 원문에 이렇게 썼습니다.

난 여기에서 이전에 언급하지 않은 연산자들을 듬뿍 썼어. 이 예제가 도움이 되려면 난 당신에게 System.Reactive 패키지가 IObservable 을 구현할 수 있는 많은 방법들을 설명해야만 해
ㅋㅋㄹㅃㅃ

이라고 써놓는데 어떻게 한답니까 ㅎㅎ..

간단한 팩토리 메서드

관찰 가능한 시퀀스를 만드는 데 사용 가능한 많은 양의 메서드들로 인해, 우리는 이제 그들을 카테고리를 기준으로 뿌셔뿌셔 할겁니다(?)
우리의 첫 메서드 카테고리는 많아봐야 하나의 (즉 0~1개) 결과를 만들어내는 IObservable 시퀀스를 만들어 줍니다.
(아 이제 숨통 트이나?)

Observable.Return

가장 간단한 팩토리 메서드 중 하나인 Observable.Return<T>(T value) 는 앞서 나왔던 Quiescent 예제에서 본 친구입니다. (발음이 어디보자... 콰이어쓴트 로군요)
지네릭 T 를 받고 하나의 값을 만들고 complete 할 IObservable<T> 를 반환합니다.
어떤 의미에서 T 에 대한 래퍼이기도 한데요, 개념적으로 new T[] { value } 를 적는 것과 비슷하므로, 하나의 요소만 가지고 있는 시퀀스가 됩니다.
T 타입의 값을 가지고 있고 Task<T> 동작을 원하는 친구에게 그 값을 넘겨주어야 할 때 사용할 수 있는 Task.FromResult 의 Rx 에 해당하는 것으로 생각할 수도 있습니다.
(이해가 될듯 말듯 😗)

IObservable<string> singleValue = Observable.Return<string>("Value");

Observable.Empty

작성중
(현재 작성중인 챕터 바로가기)

24.12.13 - 어쩌다 보니 이미 Rx 놔두고 옵저빙 시스템 구축해서 쓰다보니 블로그에 번역을 안했더군요.
몸좀 풀 겸 다시 꾸준히 번역 해놓도록 하겠습니다.

profile
평화롭게 정원 가꾸며 코드짜면 얼마나 좋을까

0개의 댓글