
코드 저장소: Saga-util github
node에서 mongodb를 사용중에 service 레이어에서 2개 이상의 mongodb collection에 대한 접근을 수행하는 상황이 자주 발생한다. 예를 들어 user 컬렉션과 usage 컬렉션에 대하여 작업을 수행하는 예시이다.
class LlmCallService {
constructor(
private userRepository: UserRepository,
private usageRepository: UsageRepository,
) {
}
async process(userId: string, prompt: string): Promise<any> {
// usage 컬렉션 업데이트
const usage = await this.usageRepository.decreaseCount(userId);
if (!usage) {
throw new Error('Not found usage');
}
if (usage.remainingCount <= 0) {
// user 컬렉션 업데이트
await this.userRepository.updatePricingType(userId, 'free');
}
try {
// 외부 api 호출
const response = await this.callLlm();
return response;
} catch (error) {
// db 복구 로직
await this.usageRepository.increaseCount(userId);
await this.userRepository.updatePricingType(userId, 'freetrial');
}
return response;
}
}
에러가 발생하여 call이 실패하는 경우 db 정보를 롤백해주는 로직이 포함되어 있다.
여기서 발생할 수 있는 추가 문제는 db update 작업 자체에서 오류가 발생하는 상황에는 데이터 정합성이 깨질 수 있다는 것이다.
이런 경우 mongodb에서 제공하는 transaction 기능을 사용하거나 보상트랜잭션 SAGA 패턴 을 사용해 볼 수 있다.
SAGA 패턴은 보통 MSA 기반의 분산 환경에서 데이터 일관성을 지키기 위해 사용되는 패턴이다. 각각의 인스턴스는 transaction을 자체적으로 사용하고, 그 결과를 이벤트 기반으로 다른 인스턴스에 전파시키는 방법으로 전체 트랜잭션을 괸리하는 방법을 의미한다.
참고할만한 블로그: cla9
그러나 이번에는 transaction 보다 보상 트랜잭션를 단일 어플리케이션에서도 구현하는 사용하기로 하였다. 이유는 크게 2가지이다.
1. application 내부에서 정확한 데이터 롤백이 아니라, 실패시에 다른 방식으로 데이터를 롤백시키는 구조가 존재했다는 점.
2. mongodb 측에서 분산 db 환경에서 trasaction 보다 saga 등 어플리케이션을 통한 관리가 best practice라는 안내를 받았다는 점이다.
보상 트랜잭션 로직을 try catch를 통해 매번 처리할 경우 문제는 다음과 같이 try catch 블록이 너무 많아 진다는 것이다.
이것을 코드로 표현하면 다음처럼된다.
async process(userId: string): Promise<any> {
const usage = await this.usageRepository.decreaseCount(userId);
if (!usage) {
throw new Error('Not found usage');
}
try {
if (usage.remainingCount <= 0) {
await this.userRepository.updatePricingType(userId, 'free');
}
} catch (error) {
await this.usageRepository.increaseCount(userId);
}
try {
const response = await this.callLlm();
return response;
} catch (error) {
await this.usageRepository.increaseCount(userId);
await this.userRepository.updatePricingType(userId, 'freetrial');
}
}
다음과 같이 매 error 발생 가능한 지점마다 try catch가 달려야한다.
이는 코드를 보는 가독성에도 좋지 않고, 밑으로 내려갈수록 보상 트랜잭션이 중첩되어 명시해줘야 한다는 문제도 있었다.
이를 해결하는 유틸리티 코드를 작성하여 이 문제를 해결하려한다.
필요조건
대응되는 보상 작업을 실행한다.분산환경으로 확장성을 가져야한다.선언적인 구조를 가져야한다.롤백에서 현재 진행된 작업에 대해 롤백을 처리하려면 2가지 정보를 알아야한다.
1. 어떤 롤백을 수행할 것인가?
2. 어디까지 작업이 실행되었는가.
해당 2가지 사항을 모두 선언적이고 자연스럽게 처리하기 위해 찾다가 나온것이 typescript의 데코레이터를 사용하는 방식이다.
ts 데코레이터 blog: inpa
우선 데코레이터에서 사용될 컨텍스트 관리 기능을 작성한다.
배열을 통해 rollback 함수를 저장해두고, 이를 실행하는 메서드를 추가한다.
LocalTransactionContext.ts
import {RollbackFailedError} from './RollbackFailedError';
export type AnyRollbackFn = () => any;
export type RollbackablePromise<T> = Promise<T> & {
rollback(rollbackFn: AnyRollbackFn): RollbackablePromise<T>;
};
export class LocalTransactionContext {
private rollbackStack: Array<() => Promise<void>> = [];
addRollback(fn: AnyRollbackFn): void {
const wrappedFn = async () => {
await Promise.resolve().then(fn);
};
this.rollbackStack.push(wrappedFn);
}
async rollbackAll(): Promise<void> {
await this.rollbackStack.reverse().reduce(async (prev, rollbackFn) => {
await prev;
try {
await rollbackFn();
} catch (error) {
throw new RollbackFailedError('rollback failed');
}
}, Promise.resolve());
}
}
addRollback을 연결시켜주기위해, 이를 붙여줄수있는 wrapper 메서드를 추가한다.
withRollback.ts
/**
* withRollback
* @param action (Promise<T>) - 비동기 action
* @returns RollbackablePromise<T>
*/
export function withRollback<T>(action: T | Promise<T>): RollbackablePromise<T> {
const promise = Promise.resolve(action);
const wrapper: RollbackablePromise<T> = Object.assign(promise, {
rollback(rollbackFn: AnyRollbackFn): RollbackablePromise<T> {
const context = localTransactionContextStorage.getStore();
if (!context) {
throw new NoActiveTransactionContextError("no active transaction context");
}
const wrappedRollback = async () => {
await Promise.resolve().then(rollbackFn);
};
context.addRollback(wrappedRollback);
return wrapper;
}
});
return wrapper;
}
컨텍스트 저장시에는 비동기 메서드 흐름을 저장해야하므로 AsyncLocalStorage를 사용하였다. 이는 서비스 메서드에서 호출하는 다른 서비스 메서드에도 @LocalTransaction 이 설정되어 있는 경우에도 안전하게 처리하기 위함이다.
LocalTransactionContextStorage.ts
import { AsyncLocalStorage } from 'async_hooks';
import {LocalTransactionContext} from "./LocalTransactionContext";
export const localTransactionContextStorage = new AsyncLocalStorage<LocalTransactionContext>();
데코레이터 연결하는 코드를 작성한다.
에러가 발생할 경우, 지금까지 등록된 롤백이 실행가능하도록 rollbackAll()만 잘 호출하도록 구성하였다.
context가 상속되고 있는 경우에도 처리할 수 있도록 newContext에 대한 분기처리를 추가해두어야한다.
LocalTransactionDecorator.ts
import {LocalTransactionContext} from "./LocalTransactionContext";
import {localTransactionContextStorage} from "./LocalTransactionContextStorage";
export type LocalTransactionOptions = {
catchUnhandledError?: boolean;
verbose?: boolean;
}
export function LocalTransaction(options: LocalTransactionOptions = {
catchUnhandledError: true,
verbose: false
}) {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const originalMethod: Function = descriptor.value;
descriptor.value = async function (...args: any[]) {
const transactionContext = localTransactionContextStorage.getStore();
if (transactionContext) {
return await executeWithContext(transactionContext, originalMethod, this, args, options);
}
const newContext = new LocalTransactionContext();
return localTransactionContextStorage.run(newContext, async () => {
return await executeWithContext(newContext, originalMethod, this, args, options);
});
}
}
}
async function executeWithContext(
context: LocalTransactionContext,
method: Function,
thisArg: any,
args: any[],
options?: LocalTransactionOptions
) {
try {
return await method.apply(thisArg, args);
} catch (error) {
if (options?.catchUnhandledError) {
options?.verbose && console.error('unhandledCatchError', error);
await context.rollbackAll();
}
throw error;
}
}
이후 적용된 코드를 보면 다음과 같다.
AS IS
async process(userId: string): Promise<any> {
const usage = await this.usageRepository.decreaseCount(userId);
if (!usage) {
throw new Error('Not found usage');
}
try {
if (usage.remainingCount <= 0) {
await this.userRepository.updatePricingType(userId, 'free');
}
} catch (error) {
await this.usageRepository.increaseCount(userId);
}
try {
const response = await this.callLlm();
return response;
} catch (error) {
await this.usageRepository.increaseCount(userId);
await this.userRepository.updatePricingType(userId, 'freetrial');
}
}
TO BE
@LocalTransaction()
async process(userId: string): Promise<any> {
const usage = await withRollback(this.usageRepository.decreaseCount(userId))
.rollback(() => this.usageRepository.increaseCount(userId));
if (!usage) {
throw new Error('Not found usage');
}
if (usage.remainingCount <= 0) {
await withRollback(this.userRepository.updatePricingType(userId, 'free'))
.rollback(() => this.userRepository.updatePricingType(userId, 'freetrial'));
}
const response = await this.callLlm();
return response;
}
확실히 이전보다 더 선언적이고 깔끔한 구조가 되었다.
실제로 잘 롤백이 이루어지는지 확인해보자.
database에는 다음 두 문서가 있다.


해당 값들을 rollback을 걸지 않고 수행할 경우에는 다음과 같은 로그가 발생한다.

db 쿼리가 수행되고 에러가 발생한다.

이번에는 LocalTransaction을 걸어둔 saga 메서드를 실행시킨다.

에러가 발생하는 상황이 되자, 정상적으로 등록해둔 롤백이 실행되는 모습을 확인할 수 있었다.