NestJS에서 트랜잭션을 위해 typeorm-transactional을 알아보자

윤학·2023년 9월 29일
5

Nestjs

목록 보기
11/13
post-thumbnail

NestJS에서 Transaction의 로직을 어떻게 깔끔하게 적용시킬 수 있을지 생각했다.

적용할 수 있는 방법이 코드에 직접 QueryRunner를 사용하거나 앞선 방식을 개선한 트랜잭션 인터셉터를 사용하는 것이였는데 Spring쪽의 @Transactional()과 유사한 라이브러리가 있어 적용해보기로 했다.

간단하게 사용법을 살펴보자.

typeorm-transactional

사용한 라이브러리는 typeorm-transactional인데 기존에 있었던 typeorm-transactional-cls-hooked의 새로운 버전이다.

사용법은 내부적으로 cls-hooked 라이브러리를 사용하여 @Transactional() 메서드 데코레이터를 제공하고 있어 트랜잭션 처리를 원하는 메서드에 데코레이터를 달고 사용하면 된다.

그럼 어떤식으로 트랜잭션을 관리해주는 것일까?
여기서 알아야 할 개념이 CLS인데 "Continuation Local Storage"의 약어로, 각각의 실행 컨텍스트에서 데이터를 공유할 수 있도록 해주는 라이브러리이다(지금은 cls-hooked인 것 같다).

JavaScript가 단일 스레드이고 NodeJS가 이벤트 루프를 사용하여 비동기 코드를 처리하기 때문에 요청으로 들어온 각각의 실행 컨텍스트를 계속 추적하기 위해선 Request 객체 자체를 계속 넘겨주어야 하는데 이를 CLS를 통해 간편하게 해결한다.

그래서 만약 @Transactional() 데코레이터가 달려있다면 독립된 실행 컨텍스트를 생성하고, 각각의 컨텍스트 내부에서는 local 변수처럼 데이터를 공유하고 접근할 수 있기 때문에 typeorm-transactional에서 설정한 key값으로 동일한 트랜잭션에 접근하여 쿼리들을 진행시킬 수 있는 것이다.

그럼 본격적으로 살펴보자.

initializeTransactionalContext()

main.ts 파일에 작성하는 함수로 컨텍스트 간에 데이터를 격리하고, 공유할 수 있는 메커니즘을 제공하는 cls-hooked의 NameSpace를 초기화 하는 과정이다.

main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { WsAdapter } from '@nestjs/platform-ws';
import { initializeTransactionalContext } from 'typeorm-transactional';

async function bootstrap() {
  initializeTransactionalContext();

  const app = await NestFactory.create(AppModule);
  app.setGlobalPrefix('/v2');
  app.useWebSocketAdapter(new WsAdapter(app));
  await app.listen(8080);
}
bootstrap();

addTransactionalDataSource

다음으로 TypeORM의 Entity들을 트랜잭션에서 사용할 수 있도록 DataSource를 추가해주자.

TypeOrmModule.forRootAsync()의 옵션을 따로 빼서 라이브러리에 있는 대로 적어주었다.

typeorm.options.ts

export const TypeOrmOptions: TypeOrmModuleAsyncOptions = {
    imports: [ConfigModule],
    inject: [ConfigService],
    useFactory: (configService: ConfigService) => ({
        type: 'mysql',
        host: configService.get('db.mysql.host'),
        port: configService.get('db.mysql.port'),
        username: configService.get('db.mysql.username'),
        password: configService.get('db.mysql.password'),
        database: configService.get('db.mysql.database'),
        autoLoadEntities: true,
        logging: true,
        synchronize: true
    }),
    async dataSourceFactory(option) {
        if( !option )
            throw new Error('Invalid options passed');

        return addTransactionalDataSource(new DataSource(option));
    }
};

만약 연결을 여러개로 가져가서 트랜잭션을 구별해서 사용하려면

연결옵션

addTransactionalDataSource({
	name: 'second-data-source',
	dataSource: new DataSource(...)
});
    

사용하는 메서드

 @Transactional({ connectionName: 'second-data-source' })
  async fn() { ... }

위와같이 작성하면 된다(따로 지정하지 않으면 'default'로 설정된다).

Propagation

기존 트랜잭션이 진행중일 때 추가적인 트랜잭션을 진행해야 할 경우 추가 트랜잭션을 어떻게 진행할지 Propagation(전파) 속성을 통해 정할 수 있다.

그럼 어떤 속성을 제공하는지 살펴보자.

REQUIRED

기본으로 설정되어 있는 값으로 현재 실행되고 있는 트랜잭션이 있다면 해당 트랜잭션에 참여한다.

이게 무슨 말인지 코드로 살펴보자.

코드는 기존 코드에서 테스트를 위해 잠시 저장 로직만 넣어놓았다.

care-application-service.ts

	@Transactional()
    async arrived(protectorId: number, caregiverId: number) {
        const newApplication = new CareApplication(protectorId, caregiverId);
        
        await this.applicationRepository.save(newApplication);
        await this.applicationRepository.save(newApplication);
        
        return await this.careAppliedService.applied(newApplication);
    }

위의 코드에서는 새로운 신청서를 만들고 2번 저장을 하고 다른 서비스의 applied() 메서드를 호출한다.

care-applied-service.ts

	@Transactional()
    async applied(application: CareApplication): Promise<any> {
        console.log('-------다른 트랜잭션---------')
        const [applyUserId, takeUserId] = [application.getApplyUserId(), application.getCaregiverId()];
        
        /* 새로운 신청 메시지 */
        const newAppliedMessage =
            new ChatMessage(applyUserId, takeUserId, MessageType.APPLICATION, ApplicationText.REQUESTED).withApplicationCode(application.getId());
        
       await this.messageRepository.save(newAppliedMessage);
       await this.messageRepository.save(newAppliedMessage);
    }

그리고 호출당한 applied()에서는 새로운 메시지를 만들어 2번 저장한다.

각각의 메서드에 달려있는 Transactional()의 전파 속성으로는 Default인 REQUIRED로 설정되어 있다.

테스트를 돌려보자.

	describe('arrived()', () => {
        it('동일한 트랜잭션 내에서 모두 RollBack 되어야 함', async() => {
            jest.spyOn(careApplicationRepository, 'save')
                .mockImplementation(
                    async() => await dataSource.getRepository(CareApplication).save((new CareApplication(1,2)))
                );
            jest.spyOn(messageRepository, 'save').mockRejectedValue(new NotFoundException());

            await careApplicationService.arrived(1,2);
        })
    })

applied() 메서드에서 메시지를 저장하다 오류가 나도록 해보았다.

그럼 모두 하나의 트랜잭션으로 묶여 정상적으로 롤백된다.

즉, REQUIRED는 applied() 메서드에서 @Transactional() 데코레이터가 달려있지만 동일한 컨텍스트(arrived() 메서드에서 생성된 컨텍스트)에서 이미 트랜잭션이 실행되고 있었기 때문에 하나의 작업으로 묶여 트랜잭션이 수행된다.

REQUIRES_NEW

이 레벨은 실행되고 있던 트랜잭션과 별개로 새로운 트랜잭션을 생성한다.

이것도 applied()에 적용해서 살펴보자.

	@Transactional({ propagation: Propagation.REQUIRES_NEW })
    async applied(application: CareApplication): Promise<any> {
        console.log('-------다른 트랜잭션---------')
        const [applyUserId, takeUserId] = [application.getApplyUserId(), application.getCaregiverId()];
        /* 새로운 신청 메시지 */
        const newAppliedMessage =
            new ChatMessage(applyUserId, takeUserId, MessageType.APPLICATION, ApplicationText.REQUESTED).withApplicationCode(application.getId());
        
       await this.messageRepository.save(newAppliedMessage);
       await this.messageRepository.save(newAppliedMessage);
    }

별개의 트랜잭션에서 실행된다고 해서 applied()에서 실패하더라도 호출한 arrived()의 작업들은 Commit되는 줄 알았다.

근데 arrived()의 작업들까지 실패한 이유는 자식(호출당한) 메서드에서 발생한 예외가 부모(호출한) 메서드에까지 전파되는데 아직 부모의 트랜잭션이 닫히지 않아 롤백되는 것이라고 한다.

그래서 부모 메서드에서 예외 처리를 해보았다.

   @Transactional()
    async arrived(protectorId: number, caregiverId: number) {
        try {
            const newApplication = new CareApplication(protectorId, caregiverId);

            await this.applicationRepository.save(newApplication);
            await this.applicationRepository.save(newApplication);

            return await this.careAppliedService.applied(newApplication);
        }
        catch (e) {

        }
    }

그리고 다시 테스트를 해보면

성공적으로 별개의 트랜잭션이 부모 메서드에서의 작업들은 커밋된다.

그래서 별개의 트랜잭션으로 동작하긴 하지만 예외 처리를 명확하게 해주지 않는다면 독립적으로는 동작하지 않기 때문에 REQUIRED를 사용하는게 편할 것 같다.

MANDATORY

같은 실행 컨텍스트 내에서 진행되고 있는 트랜잭션에 참여하거나, 트랜잭션이 존재하지 않는다면 예외를 던진다.

NEVER

트랜잭션을 사용하지 않는다는 것으로, 트랜잭션이 존재하면 예외를 던진다.

NOT_SUPPORTED

트랜잭션을 사용하지 않는다는 것으로, 트랜잭션이 이미 존재한다면 해당 트랜잭션을 중지한다.

SUPPORTS

진행되고 있는 트랜잭션에 참여하거나, 트랜잭션이 존재하지 않는다면 트랜잭션 없이 수행된다.

NESTED

중첩해서 트랜잭션을 수행하는 것으로, 현재 실행하고 있는 트랜잭션이 있다면 REQUIRED 속성 값처럼 실행된다.

중첩해서 수행한다는 말이 헷갈려서 해당 라이브러리의 코드를 봤는데 REQUIRED_NEW와 같은 방식으로 수행된다.

case Propagation.NESTED:
    return runWithNewTransaction();
case Propagation.REQUIRES_NEW:
    return runWithNewTransaction();

Hooks

트랜잭션에 대한 제어권을 라이브러리에 넘기기 때문에 라이브러리에서 커밋/롤백 이후에 수행할 수 있는 3가지 메서드를 제공하고 있다.

runOnTransactionCommit(cb)

트랜잭션이 성공적으로 커밋된 이후에 콜백 함수를 실행한다.

runOnTransactionRollBack(cb)

트랜잭션이 롤백된 이후에 콜백 함수를 실행한다. 콜백 함수의 인자로 오류의 내용이 넘겨집니다.

runOnTransactionComplete(cb)

트랜잭션이 전부 완료되었을 때 넘긴 콜백 함수를 실행한다. 오류가 발생했으면 콜백 함수의 인자로 넘겨집니다.

어떤 로그를 남기거나, 이벤트를 발생시킬 때 유용할 것 같다.

사용법이 크게 어렵지 않고, 테스트를 했을 때 아직까지 문제는 없어 괜찮은 것 같다.

참고

typeorm-transactional
Sequelize-Transactions
CLS에 대해 알아보자

0개의 댓글