NestJS CQRS 공홈 예제 뜯어보기

rxolve·2023년 10월 17일
post-thumbnail

NestJS는 CQRS를 지원한다. 공홈 예제가 잘 되어있으니 살펴보자.

https://docs.nestjs.com/recipes/cqrs

일반적인 CRUD 어플리케이션은 다음과 같다.

  • 컨트롤러는 http 요청을 받아 서비스로 전달한다.
  • 서비스에서 비지니스 로직이 실행된다.
  • 서비스는 리포지토리나 DAO를 사용하여 엔티티를 관리한다.
  • 엔티티는 값의 컨테이너다.

이 패턴은 중소규모의 프로젝트에선 문제가 없지만 프로젝트가 크고 복잡해질수록 최선이 아닐 수 있다. CQRS (Command and Query Responsibility Segregation) 모델이 더 적합하고 확장성이 높다.

  • 관심사 분리: 읽기와 쓰기 작업을 분리한다.
  • 확장성: 읽기와 쓰기를 독립적으로 확장할 수 있다.
  • 유연성: 읽기와 쓰기에 서로 다른 데이터베이스를 사용할 수 있다.
  • 성능: 읽기와 쓰기에 서로 다른 최적화를 할 수 있다.

개발하다보면 자꾸만 커지는 서비스코드를 감당할 수 없을 때가 온다. 열심히 파일과 함수를 쪼개보아도 커지는 복잡성은 감당할 수 없다. CQRS는 대안이 될 수 있을까.

설치

npm install --save @nestjs/cqrs

Commands

커맨드는 어플리케이션의 상태를 변경할 때 사용한다. 데이터 기반이 아닌 태스크 기반이어야한다. 커맨드가 전송되면 커맨드 핸들러가 처리한다. 핸들러는 어플리케이션의 상태 업데이트를 책임진다.

heroes-game.service.ts

@Injectable()
export class HeroesGameService {
  constructor(private commandBus: CommandBus) {}

  async killDragon(heroId: string, killDragonDto: KillDragonDto) {
    return this.commandBus.execute(
      new KillDragonCommand(heroId, killDragonDto.dragonId)
    );
  }
}

KillDragonCommand 인스턴스를 CommandBus의 excute 메서드에 전달한다.

kill-dragon.command.ts

export class KillDragonCommand {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {}
}

CommandBus는 커맨드의 스트림이다. 알맞는 핸들러에 커맨드를 전달한다. excute는 프라미스를 반환한다.

KillDragonCommand에 대한 핸들러를 만들어보자.

kill-dragon.handler.ts

@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(private repository: HeroRepository) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = this.repository.findOneById(+heroId);

    hero.killEnemy(dragonId);
    await this.repository.persist(hero);
  }
}

이 핸들러는 리포지토리에서 hero엔티티를 검색하고, killEnemy() 메서드를 호출한 다음 변경 사항을 저장한다. KillDragonHandler 클래스는 ICommandHandler 인터페이스의 excute 메서드를 구현한다. excute는 커맨드 객체를 인수로 받는다.

커맨드 버스에 커맨드 객체를 넣으면 알아서 커맨드 핸들러에 전달해주는게 새롭다.

Queries

쿼리는 어플리케이션 상태에서 데이터를 검색하는데 사용된다. 쿼리는 태스크 중심보다 데이터 중심이어야 한다. 쿼리가 전달되면 해당 쿼리 핸들러가 처리한다. 핸들러는 데이터의 검색을 책임진다.
쿼리 버스는 커맨드 버스와 동일한 패턴을 따른다. 쿼리 핸들러는 IQueryHandler 인터페이스를 구현하고 @QueryHandler() 데코레이터를 어노테이트한다.

커맨드는 쓰기에 대한 작업이었다면 쿼리는 읽기에 대한 작업

Events

이벤트는 어플리케이션 상태의 변경사항을 어플리케이션의 다른 부분에 알리는데 사용된다. 이벤트는 모델에 의해 전달되거나 이벤트 버스를 사용하여 직접 전달된다. 이벤트가 전달되면 해당 이벤트 핸들러가 처리한다. 예를 들면 핸들러는 읽기 모델을 업데이트 할 수 있다.

이벤트 클래스를 만들어보자.

hero-killed-dragon.event.ts

export class HeroKilledDragonEvent {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {}
}

이제 이벤트는 EventBus.publish() 메서드를 사용하여 직접 전달할수도 있지만, 모델에서 전달할 수도 있다. killEnemy() 메서드가 호출될 때 HeroKilledDragonEvent 이벤트를 전달하도록 Hero 모델을 업데이트해보자.

hero.model.ts

export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super();
  }

  killEnemy(enemyId: string) {
    // Business logic
    this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  }
}

apply 메서드는 이벤트를 전달할 때 사용한다. 이벤트 객체를 인자로 받는다. 하지만 이 모델은 이벤트 버스를 알 수 없으므로 연결이 필요하다. EventPublisher 클래스를 이용하여 연결할 수 있다.

Aggregate Root
DDD(Domain Driven Design)에서 도메인 단위이자, 우리가 현실 세계를 기술하는 명세
https://blog.decorus.io/engineering/domain%20driven%20design/2022/05/06/design-and-management-of-aggregate-root-ddd.html

kill-dragon.handler.ts

@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(
    private repository: HeroRepository,
    private publisher: EventPublisher,
  ) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command;
    const hero = this.publisher.mergeObjectContext(
      await this.repository.findOneById(+heroId),
    );
    hero.killEnemy(dragonId);
    hero.commit();
  }
}

publisher.mergeObjectContext 메서드는 이벤트퍼블리셔를 제공되는 오브젝트에 병합하므로, 이제 오브젝트가 이벤트 스트림에 이벤트를 발행할 수 있게 된다.

Hero 모델의 메서드에서 apply한 이벤트를 등록하기 위해, 커맨드 핸들러에서 객체를 가져올 때 publisher.mergeObjectContext 메서드를 사용한다.

이 예제에서는 모델에서 commit() 메서드도 호출한다. 이 메서드는 미완결 이벤트를 전달하는데 사용된다. 이벤트를 자동으로 전달하려면 autoCommit 속성을 true로 설정하면 된다.

export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super();
    this.autoCommit = true;
  }
}

이벤트 퍼블리셔를 존재하지 않는 오브젝트가 아닌 클래스에 병합하려는 경우 이벤트 퍼블리셔#mergeClassContext 메서드를 사용할 수 있다.

const HeroModel = this.publisher.mergeClassContext(Hero);
const hero = new HeroModel('id'); // <-- HeroModel is a class

이제 HeroModel 클래스의 모든 인스턴스는 mergeObjectContext() 메서드를 사용하지 않고도 이벤트를 발행할 수 있다.

또한 EventBus를 사용하여 수동으로 이벤트를 발생시킬 수 있다.

this.eventBus.publish(new HeroKilledDragonEvent());

각 이벤트들은 여러개의 이벤트 핸들러를 가질 수 있다.

hero-killed-dragon.handler.ts

@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
  constructor(private repository: HeroRepository) {}

  handle(event: HeroKilledDragonEvent) {
    // Business logic
  }
}

이벤트 핸들러를 사용하기 시작하면 기존 HTTP 웹 컨텍스트에서 벗어나게 된다는 점을 알아야한다.

  • 커맨드 핸들러의 에러는 기본 예외 필터로 잡을 수 있다.
  • 이벤트 핸들러의 에러는 예외 필터로 잡을 수 없다: 수동으로 처리해야 한다. 간단한 try/catch, 혹은 이벤트를 트리거하여 Sagas를 사용하는 방법, 또는 다른 솔루션을 선택하는 방법이 있다.
  • 커맨드 핸들러의 HTTP 응답은 클라이언트로 다시 전송할 수 있다.
  • 이벤트 핸들러의 HTTP 응답은 그렇지 않다. 클라이언트에 정보를 보내려면 WebSocket, SSE 또는 기타 원하는 솔루션을 사용해야한다.

이벤트 핸들러는 손이 많이 간다.

Sagas

사가는 이벤트를 수신하고 새로운 명령을 트리거할 수 있는 긴 실행 프로세스다. 일반적으로 어플리케이션에서 복잡한 워크플로를 관리하는 데 사용된다. 예를 들어, 사용자가 가입할 때 사가는 UserRegisteredEvent를 수신하고 사용자에게 웰컴 이메일을 보낼 수 있다.

사가는 매우 강력한 기능이다. 하나의 사가는 1...* 이벤트를 수신할 수 있다. RxJS 라이브러리를 사용하면 이벤트 스트림을 filter, map, fork, merge 하여 정교한 워크플로를 만들 수 있다. 각 사가는 커맨드 인스턴스를 생성하는 Observable을 반환한다. 그 다음 이 커맨드는 커맨드 버스에 의해 비동기적으로 전송된다.

HeroKilledDragonEvent를 수신하고 DropAncientItemCommand 명령을 전송하는 사가를 만들어 보자.

heroes-game.saga.ts

@Injectable()
export class HeroesGameSagas {
  @Saga()
  dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(HeroKilledDragonEvent),
      map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
    );
  }
}

@Saga() 데코레이터는 메서드를 사가로 지정한다. events$ 인수는 모든 이벤트의 Observable 스트림이다. ofType 연산자는 지정된 이벤트 유형에 따라 스트림을 필터링한다. map 연산자는 이벤트를 새 커맨드 인스턴스에 매핑한다.

이 예제에선 HeroKilledDragonEvent를 DropAncientItemCommand에 매핑한다. 그러면 CommandBus에 의해 DropAncientItemCommand 커맨드가 자동으로 전송된다.

사가 패턴
각 서비스의 트랜젝션을 순차적으로 처리하는 패턴, 관리 주체가 DBMS 가 아닌 Application 에 있다.
https://velog.io/@borab/%EB%B6%84%EC%82%B0-%ED%99%98%EA%B2%BD%EA%B3%BC-Event-Driven-Architecture#saga-%ED%8C%A8%ED%84%B4

셋업

마지막으로 모든 커맨드 핸들러, 이벤트 핸들러, 사가를 HeroesGameModule에 등록해야 한다.

heroes-game.module.ts

export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers =  [HeroKilledDragonHandler, HeroFoundItemHandler];

@Module({
  imports: [CqrsModule],
  controllers: [HeroesGameController],
  providers: [
    HeroesGameService,
    HeroesGameSagas,
    ...CommandHandlers,
    ...EventHandlers,
    HeroRepository,
  ]
})
export class HeroesGameModule {}

처리되지 않은 예외

이벤트 핸들러는 비동기 방식으로 실행된다. 즉, 어플리케이션이 정상적이지 않은 상태가 되는 것을 방지하기 위해 항상 모든 예외를 처리해야 한다. 그러나 예외가 처리되지 않으면 이벤트버스는 UnhandledExceptionInfo 객체를 생성하고 이를 UnhandledExceptionBus 스트림으로 푸시한다. 이 스트림은 처리되지 않은 예외를 처리하는 데 사용할 수 있는 Observable이다.

private destroy$ = new Subject<void>();

constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
  this.unhandledExceptionsBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((exceptionInfo) => {
      // Handle exception here
      // e.g. send it to external service, terminate process, or publish a new event
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

예외를 필터링하려면 다음과 같이 ofType 연산자를 사용할 수 있다.

this.unhandledExceptionsBus.pipe(takeUntil(this.destroy$), UnhandledExceptionBus.ofType(TransactionNotAllowedException)).subscribe((exceptionInfo) => {
  // Handle exception here
});

여기서 TransactionNotAllowedException은 필터링하려는 예외다.

UnhandledExceptionInfo 객체에는 다음과 같은 프로퍼티가 포함되어 있다.

export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
  /**
   * The exception that was thrown.
   */
  exception: Exception;
  /**
   * The cause of the exception (event or command reference).
   */
  cause: Cause;
}

모든 이벤트 구독하기

CommandBus, QueryBus, EventBus는 모두 Observables. 즉, 전체 스트림을 구독하고 모든 이벤트를 처리할 수 있다. 예를 들어 모든 이벤트를 콘솔에 기록하거나 이벤트 스토어에 저장할 수 있다.

private destroy$ = new Subject<void>();

constructor(private eventBus: EventBus) {
  this.eventBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((event) => {
      // Save events to database
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

예제

https://github.com/kamilmysliwiec/nest-cqrs-example

흐름 정리

POST hero/:id/kill을 요청하면 KillDragonCommand가 KillDragonHandler로 전달된다. 핸들러에선 hero객체의 killEnemy 메서드를 실행시키고 여기서 HeroKilledDragonEvent가 발행되며 HTTP 응답을 한다. HeroesGameSagas에서 해당 이벤트를 수신하고 1초 뒤에 DropAncientItemCommand를 발행한다. DropAncientItemHandler에서 해당 이벤트를 수신하고 hero객체의 addItem 메서드를 실행시켜 HeroFoundItemEvent를 발행한다. HeroFoundItemHandler에서 해당 이벤트를 수신하고 끝난다.

마치며

CQRS 예제를 살펴보며 DDD의 맛을 봤다는데 의미가 있었다. 간단한 api를 만들어보며 점점 친해지는 시간을 가져볼 예정이다.

profile
resolve to solve. 해결할 결심.

0개의 댓글