NestJS 와 RxJS

김효성·2024년 1월 5일
0

Nest js

목록 보기
3/3

Nest.js에서 Microservice를 구현하면서 RxJS개념을 사용이 되어 정확하게 이해하고자 정리를 한다.

RxJS는 Reactive Extensions For JavaScript 라이브러리라고 해서 리액티브 프로그래밍을 javascript에서 지원하기 위해 만들어진 라이브러리다.

이 리액티브 프로그래밍은 Push 시나리오 방식으로 외부와 통신합니다.

*Push 시나리오 : 외부에서 명령하면서 응답이 오면 그때 반응하여 처리함. 데이터를 가지고 오기 위해서 Subscribe를 해야 함.

RxJS는 이러한 비동기 이벤트 기반의 프로그램 작성을 돕기 위해 함수형 프로그래밍을 이용해 이벤트 스트림을 Observable이라는 객체로 표현

Observable
Observable은 event가 흐르는 stream 이다. Observable은 누군가 구독(subscribe)을 해야 event를 발행(publish) 한다. Observer가 Observable을 구독하면서 next, error, complete 키워드를 사용해 Observable에 흐르는 event를 처리.

아래는 실제 Observable의 예시.

Observable 변수에 붙은 $(달러) 표시는 Observable을 나타내는 코드 컨벤션입니다. interval()은 정의된 시간마다 증가하는 연속값을 스트림에 발생시키고, pipe() operator를 사용하여 Observable stream 내부에서 적용할 operator를 처리하게 됩니다. take는 발생시킨 이벤트 중 처음부터 n개까지의 이벤트만 받습니다.

Operator
Operator는 Observable에서 각 이벤트들에 대해 연산을 할 수 있는 pure function입니다. 앞서 언급한 것처럼, RxJS는 함수형 프로그래밍에 영향을 많이 받아 이러한 pure function들이 많이 존재합니다. 대표적으로 tap(), filter(), min(), max()와 같은 operator가 존재합니다. 여기서 tab()은 Observable 중간의 값을 가져오는 함수입니다.

Observer
Observer는 Observable을 구독하는 대상입니다. Observer를 정의하고 next, error, complete 세 가지를 정의해 주고 Observable에 구독을 하면 완성입니다. next는 Observable에 들어오는 event를 처리합니다. error는 Observable에서 error가 발생했을 때 event를 처리해 줍니다. complete는 없어도 되는 옵션인데요. Observable이 종료되면 complete가 호출되게 됩니다. 마지막으로 Observable을 Observer가 구독하면 됩니다.

간단하게 1초에 한 번씩 4번 이벤트를 발행하는 Observable을 만들고 이를 구독(subscribe) 해보았는데요. 아래 그림의 오른쪽 부분처럼 0, 1, 2, 3, complete가 뜨면서 종료가 됩니다.

1초에 한번씩 event를 발행

import {fromEvent} from 'rxjs'

const observable = fromEvent(document, 'click')
const subscriber = () => console.log('Clicked!')
observable.subscribe(subscriber)
  1. fromEvent를 이용해서 이벤트를 observable 객체로 만듭니다.(1번 파라미터는 dom요소, 2번 파라미터는 click과 같은 이벤트)

  2. subscriber(이벤트핸들러)를 정의합니다.

  3. observable가 subscribe를 사용해 이벤트 핸들러인 subscriber을 구독하도록 합니다.

1줄로 표현할 수 있는 것을 3줄로 표현하는게 사실 가독성도 떨어지고 불편하기만 한데, RxJS를 사용하는 이유는 있습니다.RxJS는 비동기 코드가 많아질 경우 그만큼 제어의 흐름이 복잡하게 얽혀 코드를 예측하기가 어려워지는데 이러한 비동기 프로그래밍의 문제를 해결하는데 도움을 줍니다.

  1. Observable
    Observable 객체는 특정 객체를 관찰하는 이벤트 핸들러인 Subscriber에게 여러 이벤트나 값을 보내는 역할을 합니다.
const observable = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  setTimeout(() => {
    subscriber.next(4)
    subscriber.complete()
  }, 1000)
})

console.log('just before subscribe')
observable.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err)
  },
  complete() {
    console.log('done')
  },
})
console.log('just after subscribe')

위 코드에서는 observable이 subscriber에게 연속으로 1,2,3을 푸시하고 1초 후에 4를 푸시하도록 되어있습니다.

Observer는 3개의 메서드가 있습니다.

  1. next: Observable에게 데이터를 전달합니다.

  2. complete: Observable에게 메서드의 실행이 완료되었음을 알립니다. next는 더 이상 데이터를 전달하지 않습니다.

  3. error: Observable에게 에러를 전달합니다. 이후에 next와 complete 이벤트가 발생하지 않습니다.

라이프 사이클

생성 -> 구독 -> 실행 -> 구독해제

Observable은 이벤트를 동기 또는 비동기로 발생시킬 수 있음을 꼭 기억해야 합니다.

  1. 연산자
    javascript의 map, filter와 같은 선언형 메소드처럼 RxJS에서도 Observable에 대해 연산메소드를 지원합니다.

pipe: pipe(연산자1(), 연산자2(), ...)

연산자를 연결해서 호출할 수 있습니다. 각 연산자를 거치면서 새로운 Observable 인스턴스를 리턴합니다.

observableCreated$.pipe(
    map(function(value) {
        return logAndGet(value, value * 2);
    }),
    map(function(value) {
        return logAndGet(value, value + 1);
    }),
    map(function(value) {
        return logAndGet(value, value * 3);
    }),
    toArray()
).subscribe(function(arr) {
    console.log('arr', arr);
});

of: 나열된 인자를 순차적으로 next하는 Observable 인스턴스를 리턴합니다.

of( 10,20,30).subscribe({
    next: console.log,
    error: console.error,
    complete: () => console.log('완료')
});

range: 범위 내 수의 값들을 전달하는 Observable 만들 때 사용합니다.

range(1,3).subscribe(
	(value) => console.log(`next ${value}`),
    (err) => console.log(`err ${value}`),
    (value) => console.log(`complete`)
)
// next 1
// next 2
// next 3
// complete

fromEvent: 브라우저에서 발생하는 Event를 Observable로 바꿀 때 사용합니다.

fromEvent(document.getElementById('app'), 'click').subscribe(
	(v) => console.log(v.currentTarget),
    (err) => console.log(err),
    () => console.log('complete)
)

firstValueFrom: Observable을 구독하고 Observable에서 첫 번째 값이 도착하자마자 promise를 반환하여 Observable을 promise로 변환합니다.

import { interval, firstValueFrom } from 'rxjs';

async function execute() {
  const source$ = interval(2000);
  const firstNumber = await firstValueFrom(source$);
  console.log(`The first number is ${ firstNumber }`);
}

execute();

// 예상 출력 :
// '첫번째 숫자는 0'
 

참고 링크1
참고 링크2

profile
백엔드개발자

0개의 댓글