(NestJS) EventEmitter를 이용해 비동기 처리 및 도메인 로직 분리

정비호·2025년 3월 6일
0

사이드 프로젝트

목록 보기
5/5

현재 진행 중인 프로젝트의 아키텍처는 DDD + Hexagonal Architecture + CQRS의 조합으로 이루어져 있다.

참고한 Repository

개인적으로 개선하고 싶은 로직들이 있었는데 대표적으로는

  1. 유저 회원가입 시 이메일 인증을 위한 메일 전송 로직
  2. 블로그 게시글 도메인 로직에서 게시글 태그, 업로드된 파일 관련 로직

이렇게 두 가지의 로직이 있는데, 하나씩 설명을 해보겠다.

개선할 로직

유저 회원가입 시 이메일 인증을 위한 메일 전송 로직

일단 유저 회원가입인증 이메일 전송로직 자체는 하나의 로직으로 묶일 필요도 없고, 만약 유저가 회원가입 하면 인증 메일이 전송되어야 한다. 같은 논리가 되면 하나의 트랜잭션으로 묶어 유저 회원가입 이후 인증 이메일 전송 로직이 실패했을 시에는 유저 회원가입도 실패를 해야 한다.
인증 이메일 전송 자체는 별도로 전송할 수 있는 API가 존재하기도 하고 실패했을 시에 유저 회원가입을 취소시켜야 할 이유도 없다.
또, 하나의 프로세스로 묶이면 메일 전송이 끝날 때까지 사용자는 Response를 받지 못한다.
메일 전송은 시간 소요가 꽤 있는 작업이기 때문에 Response가 나가는 로직과는 별개로 처리되게 할 필요가 있다.

블로그 게시글 도메인 로직에서 게시글 태그, 업로드된 파일 관련 로직

기존 블로그 게시글 도메인 로직에서 수행되는 작업은 총 3가지다.

  1. 게시글 생성
  2. 게시글 태그 생성
    • 태그 생성은 기존에 생성된 적 있는 이름은 빼고 새로운 이름의 태그들을 생성해 주어 중복 데이터를 방지하고 게시글 테이블과의 매핑 테이블에 태그의 id와 게시글의 id를 담아서 로우를 생성한다.
      • 여담이지만 조회 성능을 높이기 위해 그냥 태그를 중복 저장하는 방식으로 바꿀 생각을 하고 있다.
  3. 게시글에 업로드된 Attachment 관련 핸들링
    • 게시글에 파일(글 사이에 들어가는 이미지 등등)을 업로드 하기 위해선 먼저 클라이언트 측에서 파일 업로드 하는 API를 통해 선업로드를 한다.
    • 서버에선 클라이언트에서 보낸 파일을 S3temp path로 저장 및 Attachment 테이블에도 파일에 대한 정보들로 로우 생성(capacity, mime-type 등등. buffer 제외)하고 업로드된 url을 response로 반환
    • 해당 API의 Response로 오는 temp path에 업로드된 S3 객체들의 url을 통해 게시글 본문에 유저가 업로드 하고자 하는 이미지들을 구성하고 게시글 생성 API를 호출한다.
    • 서버에선 해당 게시글에 업로드된 temp path의 S3 객체들을 post path로 복사한다.
      • 기존 temp path 객체들은 수명 주기 규칙을 통해 2일 뒤 자동으로 삭제
      • ex) temp/123456789 --> post/123456789
    • 게시글 본문에 있는 기존 url들도 그에 맞춰 수정해 준다.

해당 로직을 CreateBlogPostCommandHandler에서 수행하고 있는데, 해당 CommandHandler는 말 그대로 블로그 게시글을 생성하는 ApplicationService다.
도메인 로직 또한 다른 외부 도메인의 Aggregate Root를 생성하는 로직을 담고 있다.
또 게시글 본문의 url들을 정규식으로 하나하나 찾아서 변경해 주는 로직의 소요 시간은 본문이 길고 이미지가 많아질수록 무시할 수 없을 것이다.
정리하면, 코드가 너무 길기도 하고 외부 도메인 로직을 분리하며 응답 지연 시간을 줄이기 위해 로직을 찢을 것이다.

EventEmitter를 이용해 Event-Driven 프로그래밍

EventEmitter를 사용해서 각 코드를 관심사에 맞게 분리할 수 있다.
EventEmitter에 대한 자세한 설명은 생략하고 바로 개선 전 예시 코드와 함께 개선을 진행해 보겠다.

유저 회원가입 시 이메일 인증을 위한 메일 전송 로직 개선

async execute(command: CreateUserCommand) {
  // 회원가입하려는 유저의 이메일, 패스워드
  const { email, password } = comamnd; 
  
  // 이메일 중복 체크를 위해 유저 조회
  const existingUser = await this.userRepository.findOneByEmail(
      email
    );

  if (existingUser) {
    // 이메일 중복이면 throw Exception
    throw new HttpConflictException();
  }

  // 패스워드 hash
  const hashedPassword = await bcrypt.hash(
    password,
    10
  );

  // 새로운 UserAggregateRoot 생성
  const user = UserEntity.create({
    email,
    password: hashedPassword,
  });

  await Promise.all([
    // 새로운 유저 엔티티에 영속성 부여
    this.userRepository.create(user),
    // 생성된 유저의 이메일에 인증 메일 전송
    this.emailService.sendVerificationMail(email)
  ]);

  return user.id;
}

실제 로직과는 꽤 다르지만, 이해를 위해 좀 간소화했고 흐름 자체는 비슷하다.

그럼, 처음 계획했던 대로

await this.emailService.sendVerificationMail(email);

해당 부분을 EventEmitter로 이벤트를 방출해서 EventHandler(혹은 EventListener)에 의해 처리되도록 수정해 보자.

// User가 생성됐을 때 방출할 도메인 이벤트
class UserCreatedDomainEvent {
  readonly email: string;
  
  constructor(props) {
    this.email = props.email;
  }
}
async execute(command: CreateUserCommand) {
  // 회원가입하려는 유저의 이메일, 패스워드
  const { email, password } = comamnd; 
  
  // 이메일 중복 체크를 위해 유저 조회
  const existingUser = await this.userRepository.findOneByEmail(
      email
    );

  if (existingUser) {
    // 이메일 중복이면 throw Exception
    throw new HttpConflictException();
  }

  // 패스워드 hash
  const hashedPassword = await bcrypt.hash(
    password,
    10
  );

  // 새로운 UserAggregateRoot 생성
  const user = UserEntity.create({
    email,
    password: hashedPassword,
  });

  await this.userRepository.create(user);
  
  // UserDomainEvent 생성
  const userCreatedDomainEvent = new UserCreatedDomainEvent({
    email
  });
    
  // UserCreatedDomainEvent 비동기로 방출
  await this.eventEmitter.emitAsync(
    userCreatedDomainEvent.name,
    userCreatedDomainEvent
  );

  return user.id;
}

이렇게 유저를 DB에 저장하고 나서 UserCreatedDomainEvent 인스턴스를 만들어서 eventEmitter를 통해 emit 해준다.

이제 해당 이벤트를 수신하고 후처리 로직을 핸들링할 EventHandler를 만들어 주자.

@Injectable()
export class SendVerificationMailWhenUserCreatedDomainEventHandler {
  constructor(
    @Inject(EMAIL_SERVICE_DI_TOKEN)
    private readonly emailService: EmailServicePort,
  ) {}

  @OnEvent(UserCreatedDomainEvent.name, {
    async: true,
  })
  async handle(event: UserCreatedDomainEvent): Promise<void> {
    await this.emailService.sendVerificationEmail(
      event.email,
    );
  }
} 

onEvent 데코레이터의 optionasynctrue로 주어 비동기로 처리되도록 해준다.
이렇게 하면 해당 로직은 다음 틱에 실행되게 되기 때문에 유저 생성 API의 Response가 빠르게 나갈 수 있다.
간단하게 말하면 API의 응답이 끝나고 나면 EventHandler 로직이 실행된다.

이렇게 도메인 로직 분리와 기존 메일 전송 로직까지 포함해 대략 3~5초 정도의 지연이 있던 유저 생성 API의 응답 시간을 0.8초 정도까지 개선했다.

블로그 게시글 도메인 로직에서 게시글 태그, 업로드된 파일 관련 로직

async execute(command: CreateBlogPostCommand) {
  // 게시글의 제목, 본문, 업로드된 File의 urls, 게시글에 달 태그 이름들
  const { title, contents, fileUrls, tagNames } = comamnd; 
  
  if (fileUrls.length) {
    // temp path로 INSERT 됐던 Attachments를 찾는 로직
    // temp path로 s3에 업로드됐던 파일들의 path를 blog-post로 수정해 주는 로직
    // 수정한 path에 맞게 attachment들도 update 해주는 로직
    // contents 내의 file url들을 수정된 url로 replace하는 로직
    // 매핑 테이블인 blog_post_attachments에 로우 생성
  }
  
  if (tagNames.length) {
    // tagNames를 조건으로 기존에 존재하는 tag들을 찾는 로직
    // 중복되지 않는 tagNames들로 새로운 tag들을 생성하는 로직
    // 매핑 테이블인 blog_post_tags에 로우 생성
  }

  // 새로운 BlogPostAggregateRoot 생성
  const blogPost = BlogPost.create({
    title, contents
  });

  // 새로운 BlogPost 엔티티에 영속성 부여
  await this.blogPostRepository.create(blogPost);
    
  return blogPost.id;
}

마찬가지로 로직 자체가 길고 복잡해서 글을 이해하기에 방해가 된다 생각해 간소화했다.
개선 포인트는 이렇다.

  1. tagNamesDomainEvent에 담아서 하나의 트랜잭션으로 EventHandler에서 처리되도록 수정
  2. fileUrlscontents 또한 DomainEvent에 담아서 다음 틱에 EventHandler에서 처리되도록 수정

Attachment에 대한 처리를 하나의 트랜잭션으로 묶지 않는 이유는, 이미지 양에 따라 다르지만, 기본적으로 시간 소요가 있다.
또한, blog-post path로 옮겨진 temp path의 S3 객체들은 바로 삭제되는 게 아닌, 수명 주기 규칙 설정을 통해 대략 2일 뒤에 자동으로 삭제된다.
그 말은, 게시글 생성이 완료되고 조회를 하는 시점에 2번의 작업이 다 끝나지 않아도 이미지를 표시하는 데 전혀 문제가 없다는 뜻이다.

그럼, 이제 다시 1번부터 개선을 해보자.

tagNames를 DomainEvent에 담아서 하나의 트랜잭션으로 EventHandler에서 처리되도록 수정

class BlogPostCreatedDomainEvent {
  // BlogPost의 ID
  readonly aggregateId: string;
  // 생성 할 Tag의 이름들
  readonly tagNames: string[];
}
async execute(command: CreateBlogPostCommand) {
  // 게시글의 제목, 본문, 업로드된 File의 urls, 게시글에 달 태그 이름들
  const { title, contents, fileUrls, tagNames } = comamnd; 
  
  if (fileUrls.length) {
    // temp path로 INSERT 됐던 Attachments를 찾는 로직
    // temp path로 s3에 업로드됐던 파일들의 path를 blog-post로 수정해 주는 로직
    // 수정한 path에 맞게 attachment들도 update 해주는 로직
    // contents 내의 file url들을 수정된 url로 replace하는 로직
    // 매핑 테이블인 blog_post_attachments에 로우 생성
  }

  // 새로운 BlogPostAggregateRoot 생성
  const blogPost = BlogPost.create({
    title, contents
  });

  // 새로운 BlogPost 엔티티에 영속성 부여
  await this.blogPostRepository.create(blogPost);

  // BlogpPostCreatedDomainEvent 생성
  const blogPostCreatedDomainEvent = new BlogPostCreatedDomainEvent({
    aggregateId: blogPost.id,
    tagNames,
  });
  
  await this.eventEmitter.emitAsync(
    blogPostCreatedDomainEvent.name,
    blogPostCreatedDomainEvent
  );
    
  return blogPost.id;
}

마찬가지로 DomainEventEventEmitter로 방출시키는 로직을 짠다.
EventHandler도 만들어 보자.

@Injectable()
export class CreateTagsWhenBlogPostCreatedDomainEventHandler {
  constructor(
    private readonly commandBus: CommandBus,
     
    @Inject(BLOG_POST_TAG_REPOSITORY_DI_TOKEN)
    private readonly blogPostTagRepository: BlogPostTagRepositoryPort
  ) {}

  @OnEvent(BlogPostCreatedDomainEvent.name, {
    suppressErrors: false
  })
  async handle(event: BlogPostCreatedDoaminEvent): Promise<void> {
    const { tagNames, aggregateId } = event;
    
    if (!tagNames.length) {
      return;
    }
    
	const command = new CreateNewTagsCommand({
      tagNames
    });
    
    // CreateNewTagsCommand의 실행 결과로 생성된 Tag들의 ID를 반환
    const result: bigint[] = await this.commandBus.execute(command);
    
    // 생성된 TagId들을 매핑 blogPost와 매핑
    await this.blogPostTagRepository.bulkCreate(
      result.map((tagId) => 
      	BlogPostTagEntity.create({ blogPostId: aggregateId, tagId });
      )
    );
  }
} 

suprressErrors 옵션을 false로 줬는데, 이유는 OnEvent에선 suppressErrors의 default 값이 true로 지정되어 있다.
true일 경우엔 에러가 발생해도 throw되지 않아서 트랜잭션도 ROLLBACK이 되지 않기 때문에 false 값을 준다.

태그 생성을 Repository를 직접 주입 받지 않고 Command를 통해 처리하는 이유는 BlogPostModule, TagModule 간의 결합도를 낮추기 위해서다.

이렇게 블로그 생성 도메인 로직에서 태그 관련 로직을 EventHandler에서 처리되도록 분리했다.

이제 2번을 개선해 보자.

fileUrls와 contents 또한 DomainEvent에 담아서 다음 틱에 EventHandler에서 처리되도록 수정

class BlogPostCreatedDomainEvent {
  // BlogPost의 ID
  readonly aggregateId: string;
  // 생성할 Tag의 이름들
  readonly tagNames: string[];
  // 처리할 fileUrls
  readonly fileUrls: string[];
  // 생성된 게시글의 본문
  readonly contents: Array<Record<string, any>>;
}
async execute(command: CreateBlogPostCommand) {
  // 게시글의 제목, 본문, 업로드된 File의 urls, 게시글에 달 태그 이름들
  const { title, contents, fileUrls, tagNames } = comamnd; 

  // 새로운 BlogPostAggregateRoot 생성
  const blogPost = BlogPost.create({
    title, contents
  });

  // 새로운 BlogPost 엔티티에 영속성 부여
  await this.blogPostRepository.create(blogPost);

  // BlogpPostCreatedDomainEvent 생성
  const blogPostCreatedDomainEvent = new BlogPostCreatedDomainEvent({
    aggregateId: blogPost.id,
    tagNames,
    fileUrls,
    contents,
  });
  
  await this.eventEmitter.emitAsync(
    blogPostCreatedDomainEvent.name,
    blogPostCreatedDomainEvent
  );
    
  return blogPost.id;
}

마찬가지로 if 문을 제거해 주고 BlogPostCreatedDomainEventfileUrls, contents property를 추가해 준다.

새로운 EventHandler도 추가하자.

@Injectable()
export class CreateContentsAttachmentsWhenBlogPostCreatedDomainEventHandler {
  constructor() {}

  @OnEvent(BlogPostCreatedDomainEvent.name, {
    async: true
  })
  async handle(event: BlogPostCreatedDoaminEvent): Promise<void> {
    const { contents, fileUrls, aggregateId } = event;
    
    if (!fileUrls.length) {
      return;
    }
    
	// temp path로 INSERT 됐던 Attachments를 찾는 로직
    // temp path로 s3에 업로드됐던 파일들의 path를 blog-post로 수정해 주는 로직
    // 수정한 path에 맞게 attachment들도 update 해주는 로직
    // contents 내의 file url들을 수정된 url로 replace하고 DB에 UPDATE SQL을 날리는 로직
    // 매핑 테이블인 blog_post_attachments에 로우 생성
  }
} 

Attachment 관련 로직은 복잡해서 주석으로 대체한다.
여기선 Command를 사용하지 않는데, 그 이유는 로직 자체가 Command로 만드는 게 BlogPost만을 위해서 너무 억지로 만드는 느낌이 강해서이다.

난 여기서 S3 작업 관련 로직 또한 AttachmentDomainEvent를 발생시켜 EventHandler에서 처리되도록 분리했는데 이건 생략하도록 하겠다.

마무리

EventEmitter를 통해 Event-Driven Programming을 해봤는데 여러 도메인이 섞여 있는 불편한 로직들을 하나하나 찢으니, 코드들이 각자의 역할만 맡아서 처리하게 됐고 보기에도 깔끔해져서 정말 기분이 좋다.

profile
잘하고 싶은 개발자

0개의 댓글

관련 채용 정보