Rx민수야 고맙다! - RxJS를 소개합니다

redjen·2023년 6월 27일
12
post-thumbnail
post-custom-banner

<출처> 나 (직접 수공예로 만들었습니다..)

Reactive Streams와 ReactiveX에 대해

ReactiveX의 공식문서에서는 Observable한 시퀀스를 사용해 비동기적인, 이벤트 기반의 프로그램들을 다루기 위한 라이브러리로써 ReactiveX로 소개한다.

ReactiveX는 옵저버 패턴을 적용해서

  • 데이터나 이벤트의 시퀀스들을 지원하거나
  • 시퀀스들을 선언적으로 다룰 수 있는 연산자들을 지원하면서도
  • 로우 레벨에서 이뤄지는 쓰레드 제어, 동기화, thread safety, 동시성을 지원하는 자료구조, 논블락킹 IO에 대해 신경쓰지 않게 해준다. (개인적으로는 이게 가장 큰 장점이라고 생각한다)

ReactiveX의 Observable은 여러 아이템의 비동기적인 시퀀스를 다룰 수 있는 이상적인 방법을 제공한다. (아래 표는 Java 기준)

구분단일 아이템복수 아이템
동기T getData()Iterable<T> getData()
비동기Future<T> getData()Observable<T> getData()

ReactiveX 라이브러리들은 가끔 함수형 반응형 프로그래밍으로써 불리지만, 이건 잘못된 명명이라고 소개한다.

ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time.

ReactiveX는 함수형일수도 있고, 반응형일수도 있지만, '함수형 반응형 프로그래밍'은 전혀 다른 종류의 것이다. 한 가지 큰 차이점은 함수형 반응형 프로그래밍은 시간에 따라 지속적으로 변화하는 값을 다루지만, ReactiveX는 시간에 따라 방출되는 이산 값에 대해 작동한다는 것이다.

ReactiveX의 Observable은 Java의 Future처럼 단일 스칼라 값의 emission이 아니라 값의 시퀀스 또는 무한한 스트림을 다룬다. Observable은 여러 유즈 케이스를 다룰 수 있도록 설계된 단일 추상화이다.

Iterator 패턴이 어떻게 적용되었나

Observable은 비동기 / 푸시 방식을 활용하고, Iterable은 동기 / 풀 방식을 활용한다.

이벤트Iterable (pull)Observable (push)
데이터 수신T next()onNext(T)
에러 처리throw ExceptiononError(Exception)
완료 처리!hasNext()onCompleted()

리액티브 프로그래밍의 장점

ReactiveX는 이런 Observable들을 필터링하고, 선택하고, 변화시키고, 합칠 수 있는 연산자를 제공한다.

Iterator 패턴에서 컨슈머가 프로듀서로부터 값을 풀하는 것과 반대로 Observable은 프로듀서가 값이 준비되자 마자 컨슈머에게 값을 밀어넣는 방식으로 동작한다.

Observable 타입은 기존 GoF의 옵저버 패턴에 존재하지 않는 두 가지 의미를 부여한다.

  1. 프로듀서가 더 이상 사용할 수 없는 데이터가 없는 상태임을 컨슈머에게 알려줄 수 있다.
  2. 프로듀서가 컨슈머에게 값을 전달하던 중 오류가 발생했음를 알릴 수 있다. (Iterable은 iteration 도중 에러가 발생하면 Exception을 던지지만, Observable은 옵저버의 onError 메서드를 호출한다)

RxJava, RxJS, Rx.NET, RxScala와 같이 다양한 언어들을 위한 포팅이 완료되어 널리 사용중이다.

FE에의 적용

너무 잘 정리되어 있는 글이 있어 공유해본다. 아마 이 글을 보는 많은 분들은 앵귤러보단 리액트에 더 익숙할테니..!
https://yozm.wishket.com/magazine/detail/1753/

앵귤러에서의 활용

그럼에도 불구하고, RxJS를 얘기할 때 앵귤러를 빼놓고 넘어가긴 아쉽다. (현재 현업에서 사용하고 있기도 하고)
앵귤러 프레임워크에서는 RxJS를 내부 상태 관리에 적극적으로 활용한다.

https://angular.io/guide/rx-library

앵귤러에서의 Reactive Form vs Template Form

앵귤러는 다들 알다시피 양방향 바인딩 문법이 존재한다. 하지만 앵귤러의 양방향 바인딩은 사실 prop과 prop 변화에 대한 이벤트 핸들러를 내부적으로 등록해주는, 실제로는 단방향 바인딩*2 로써 동작한다.

결국 우리가 아는 ngModel을 사용한 양방향 바인딩은 '사용자 인터랙션'을 다루기 위한 이벤트 핸들러의 한 종류로 볼 수 있는 것이다.

그런데 앵귤러의 ReactiveFormsModule을 다루다 보면 ngModel을 사용했을 때 deprecated 되었다는 메시지가 뜬다.

https://angular.io/guide/deprecations#deprecated-features-that-can-be-removed-in-v11-or-later

앵귤러 11버전 부터는 reactive forms 내부에서 ngModel을 deprecate 처리 시켰다.. 왤까?

  • ReactiveFormsModulengModel을 같이 사용했을 때 번들 사이즈 크기 증가
  • 두 가지 방법의 철학이 서로 상충되는 부분이 있고, 이는 피하는 것이 자연스럽다.
  • 복잡한 템플릿 폼에서 타이밍 관련한 문제가 있었다.
    • ngModel 자체가 직접적인 바인딩을 가지고 있는 것이 아니고, 바인딩 자체가 한 틱 안에 일어나지 않는다면 observable을 올바른 타이밍에 subscribe해서 form을 변화시키기 어렵다.

그렇다면, Observable을 활용하는 ReactiveFormsModuleFormsModule과 비교하였을 때 어떤 이점이 있을까?

  • 다들 알듯이 엘리먼트가 많이 존재하지 않는 Form을 만드는 일은 그리 어렵지 않다.
  • 하지만 입력 받는 사용자의 정보가 점점 많아진다면 어떨까?
  • 게다가 여러 엘리먼트들에 걸친 validation 로직을 작성한다고 가정해보자.
    • validator 로직을 분리하기 쉬울까? 여러 엘리먼트에 입력 받는 데이터의 타입 자체도 다 다르다면, 공통화하기 매우 어렵다

https://blog.angular-university.io/introduction-to-angular-2-forms-template-driven-vs-model-driven/

  1. Reactive Forms를 사용한다면 훨씬 깨끗하고 프레젠테이션 로직에 집중하여 폼 모듈을 개발할 수 있다.
  2. 폼 템플릿 안에 다양한 비즈니스 validation 규칙들이 존재한다면, 이를 컴포넌트 클래스로 분리하기 쉬워진다. (테스트가 쉬워지는 것은 덤이다)
  3. 때문에 커스텀 validator를 개발하기 쉽다 - 함수를 정의하고 설정 안에 끼워넣기만 하면 된다.
this.form.valueChanges
    .pipe(
        map((value) => {
            value.firstName = value.firstName.toUpperCase();
            return value;
        }),
        filter((value) => this.form.valid)
    )
    .subscribe((value) => {
           console.log("Reactive Form valid value: vm = ",
                       JSON.stringify(value));
        });

때문에 템플릿이 복잡해지고, 더 많은 비즈니스 로직을 넣어야 한다면 ReactiveFormsModule 사용은 선택이 아닌 필수이다.

Observable로부터 시작하는 RxJS 뽀개기

Promise로 Observable 만들기

import { from, Observable } from 'rxjs';

// Create an Observable out of a promise
const data = from(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
  next(response) { console.log(response); },
  error(err) { console.error('Error: ' + err); },
  complete() { console.log('Completed'); }
});

Promise와 Observable은 비슷해 보이지만 엄연히 다르다..! (Observable이 Promise의 상위 호환이다)

구분동기비동기
단일 값valuePromise
복수 값ArrayObservable

반복해서 이야기 하지만, 여러 개의 Promise를 (무한한 개수일 수도 있다) 미리 구성한 파이프라인을 통해 어떻게 처리할 것인지 약속해놓는 것이 Observable이다.

Event로 Observable 만들기

import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element')!;

// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent<MouseEvent>(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe(evt => {
  // Log coords of mouse movements
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);

  // When the mouse is over the upper-left of the screen,
  // unsubscribe to stop listening for mouse movements
  if (evt.clientX < 40 && evt.clientY < 40) {
    subscription.unsubscribe();
  }
});

샘플 요구사항을 구현해보자

버튼을 한번 누를 때에는 아무 동작하지 않다가, 버튼을 3번째 누를 때마다 버튼을 몇 번 눌렀는지 alert하는 요구사항이 생겼다고 가정해보자.

리액트로는 어떻게 할까? useStateuseEffect를 적절히 사용한다면..

const [count, setCount] = useState<number>(0);

const handleClick = () => {
    setCount(count + 1);
}

useEffect(() => {
    if (count % 3 === 0) {
        alert("!!!");
    }
}, [count]);

앵귤러와 rxjs를 적절히 사용한다면.. (절대 두 라이브러리 / 프레임워크 중 어떤 것이 더 좋다고 얘기하는 것이 아니다)

export class AppComponent implements OnInit, OnDestroy {

  subscription!: Subscription

  private click$ = new Subject<void>()

  count$: Observable<number> = this.click$.pipe(
    scan(previous => previous + 1, 0),
    tap(count => (count % 3 === 0) ? alert("!!!") : console.log(count))
  )

  ngOnInit(): void {
    this.subscription = this.count$.subscribe()
  }

  ngOnDestroy(): void {
    this.subscription?.unsubscribe()
  }

  onClick() {
    this.click$.next()
  }
}
  • scan operator는 상태를 캡슐화하고 관리하기 유용한 연산자다.
    • accumulator 함수를 사용하여 초기 상태로부터 다음 값을 도출해낼 수 있다.
    • 설명
  • tap operator는 개발자가 부수적인 효과를 특정한 위치에서 부여할 수 있는 연산자다.
    • map이나 mergeMap 내부에서 이를 행할 수도 있지만, 이들의 사용이 매핑 함수를 순수하지 못하게 만들 때 tap을 사용한다.
    • 설명

FE 개발을 하다 보면 흔히 마주치게 되는 기능 구현에 대한 요구사항은 보통 다음과 같은 것들이다.

  • 채팅방에 5명 이상이 들어온다면 '현재 인기 있는 채팅방' 라벨을 표시하게 해주세요.
  • 실시간 차트에서 1분 단위로 실시간 데이터를 가져오고, 특정 값이 변화했을 때 토스트 팝업을 띄워주세요.
  • 마우스 스크롤 했을 때 새로운 아이템 목록을 불러오게 해주세요.

공통점은 무엇인가? ~ 했을 때 (if) ~ 해주세요.

js는 동기적인 언어이지만, 요구 사항들이 비동기처리로 이루어져야 하기 때문에 우리는 이벤트 + Promise의 조합을 써왔다.

하지만 요구 사항이 복잡해지고, 인터랙션해야 하는 다양한 컴포넌트들이 화면에 계속해서 추가된다면 Promise에 + Promise에.. 코드가 복잡해지고 따라서 상태 관리도 복잡해지는 경우가 많다.

이처럼 비동기 처리할 이벤트가 여러 개라면, rxjs를 사용해서 우아하게 요구사항을 처리할 수 있다.

  • 채팅방에 들어오는 이벤트를 Observable로 만들어, 현재 들어와 있는 인원이 5명이라면 (filter) 라벨 표시
  • 실시간 차트에서 1분 단위로 실시간 데이터를 가져오고 (서버에서 받아오는 reactive stream을 통한 Observable 생성)+ 특정 값이 변화했을 때 (특정 값에 대한 Observable) 특정 동작 행하기
  • 마우스 스크롤 이벤트로부터 Observable을 만들어, HttpService 특정 메서드 실행

특수하거나 아주 구체적인 경우에 대한 예시도 이렇게 잘 구비되어 있다.

앵귤러와 RxJS를 잘 사용해서 debounce 이벤트를 잘 구현한 예시

참고) 어떤 Operator를 써야 할까

https://rxjs.dev/operator-decision-tree

Reactor 공식 문서에는 상당히 불친절하게 되어 있는 반면, rxjs는 원하는 연산자를 굉장히 쉽게 찾을 수 있도록 해놓은 모습이다 ㅠ

BE에의 적용

내가 써봤던 JS 백엔드 라이브러리는 NestJS가 유일하지만, NestJS에서도 Observable을 네이티브하게 잘 지원한다.
이럴거면 RxJS를 Javascript 표준에 넣어달라고.. TC39 proposal

Nestjs에서 intercept를 사용하여 요청 처리 도중에 원하는 로직 넣기

https://docs.nestjs.com/interceptors

nestjs의 인터셉터는 AOP 기법을 사용해서 컨트롤러 / 서비스 / 도메인 간 공통된 로직을 분리 적용하기 아주 좋은 기능이다.

모든 인터셉터들은 intercept() 메서드를 구현해야 한다. intercept() 메서드는 두 개의 인자를 받는다.
1. ExecutionContext 인스턴스 (가드가 사용하는, ArgumentHost를 상속받는 그 객체와 동일하다)
2. CallHandler 인터페이스

  • CallHandler 인터페이스는 handle() 메서드를 구현하여 인터셉터의 한 부분에서 라우트 핸들어 메서드를 사용할 수 있게 한다.
  • handle() 메서드를 구현하지 않는다면 라우트 핸들러 메서드는 아예 실행되지 않는다.

즉 이는 intercept() 메서드를 통해 효과적으로 요청 / 응답 객체 스트림을 감쌀 수 있다는 것을 의미한다.
요청과 응답을 감싸기 위한 커스텀 로직이 있다면, 최종 라우트 핸들러가 실행 되기 전이나 되고 난 후에 실행하도록 처리할 수 있는 것이다. 그리고 이는 handle() 메서드가 Observable을 반환하기 때문에 가능하다.

예를 들어 POST /cats 요청을 처리하는 API가 있다고 가정해보자.

  • 요청은 CatsController 내부의 create() 핸들러에 의해 처리된다.
  • 만약 인터셉터가 handle() 메서드를 정의하지 않는다면 create() 메서드는 실행되지 않는다.
  • handle() 메서드가 실행되고, Observable이 리턴된다면 create() 핸들러는 트리거된다.
  • 응답 스트림이 Observable로부터 수신된다면, 부가적인 연산이 스트림에 행해진 후 요청자에게 최종 결과가 리턴된다.
import { Injectable } from '@nestjs/common';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

@Injectable()
export class LoggingInterceptor {
  intercept(context, next) {
    console.log('Before...');

    const now = Date.now();
    return next
      .handle()
      .pipe(
        tap(() => console.log(`After... ${Date.now() - now}ms`)),
      );
  }
}

이처럼 Observable을 사용한다면 매우 간단하게 로깅 인터셉터를 만들 수 있다.
Promise로 처리 전후에 대한 비동기 이벤트를 핸들링한다면 조금 더 복잡했을 것일 것이다. (특히 처리해야 할 로직이 복잡해진다면 더더욱)

Reactive Stream의 가장 큰 장점은 비동기로 이뤄지는 여러 이벤트에 대한 복잡한 비즈니스 로직 구현을 보기 쉽게 처리할 수 있다는 것이다.

profile
make maketh install
post-custom-banner

1개의 댓글

comment-user-thumbnail
2023년 7월 1일

ㅋㅋㅋ좋은 글과 수작업으로 만든 짤 잘 보고 갑니다!!.

답글 달기