앞서 작성했던 포스트에서 auth-service와 catalog-service를 rabbitMQ로 연결하여 데이터 동기화에 대한 예시를 작성했었습니다. 그리고 order-service를 구현하면서 catalog-serivce와의 데이터 동기화에 대한 시나리오도 생각해봤구요. 그래서 이번 포스트에서는 이 데이터 동기화를 하기 위한 catalog-service와 order-service의 흐름도를 제가 알아보기 편한 의사코드, 실제 코드 작성을 진행해보겠습니다.
1) 주문 생성과 재주문
1-1) order.status의 값이 CREATE, REORDER로 변경됩니다.
1-2) order-service의 db에는 qty의 값이 order에 맞게 저장됩니다.
1-3) 1-1)에는 qty의 값이 존재하므로 메시지 큐로 메시지를 전달합니다.
1-4) catalog-service에서는 현재 stock에서 qty를 차감합니다.
1-4-1) 만약 qty > stock 즉, 주문량이 재고량보다 많다면 에러메시지를 반환합니다.
1-4-2) 에러메시지를 order-service에서 받는다면 status의 값을 CANCEL_ORDER로 변경합니다.
2) 주문 취소
2-1) order.status의 값이 CANCEL로 변경됩니다.
2-2) 메시지 큐로 order.status의 값이 CANCEL이 됐음을 알립니다.
2-3) catalog-service에서는 해당 메시지를 받고 stock의 값을 Rollback시킵니다.
1), 2)를 보면 CREATE_ORDER, RE_ORDER, CANCEL_ORDER의 경우에 대한 정상처리, 에러처리를 해결할 수 있을 것 같습니다.
1)에서 정상처리를 한 경우 stock = stock - qty가 catalog-service에 저장될 것입니다. 그리고 만일 1)의 에러 케이스인 에러 메시지(CANCEL_ORDER로 상태 변화)와 2)의 CANCEL_ORDER가 메시지 큐로 전달된다면 stock = stock + qty로 roll back하도록 하겠습니다. 그러면 이것을 바탕으로 코드를 구현해보도록 하겠습니다.
order-service에 다음의 패키지를 설치하도록 하겠습니다.
npm i --save @nestjs/microservices
npm i --save amqplib amqp-connection-manager
우선 order-service부터 코드를 구현해보도록 하겠습니다.
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { TypeOrmModule } from '@nestjs/typeorm';
import { OrderEntity } from 'src/entity/order.entity';
import { OrderController } from './order.controller';
import { OrderService } from './order.service';
@Module({
imports: [
TypeOrmModule.forFeature([OrderEntity]),
ClientsModule.register([{
name: 'order-service',
transport: Transport.RMQ,
options: {
urls: ['AMQP_URL'],
queue: 'ecommerce_queue',
queueOptions: {
durable: false
},
},
}])
],
controllers: [OrderController],
providers: [OrderService]
})
export class OrderModule {}
코드를 살펴 보면 order-service를 amqp cloud url을 통해서 ecommerce_queue 메시지 큐에 등록을 하겠다는 의미입니다. 여기서 order-service는 CREATE_ORDER, RE_ORDER를 메시지큐로 전송하기 위한 이름입니다.
그리고 catalog-service를 구현하겠습니다.
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { CatalogModule } from './catalog/catalog.module';
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'mysql',
host: 'localhost',
port: 3306,
username: 'root',
password: '10a10a',
database: 'catalog',
autoLoadEntities: true,
synchronize: true,
}),
ClientsModule.register([{
name: 'order-service',
transport: Transport.RMQ,
options: {
urls: ['AMQP_URL'],
queue: 'ecommerce_queue',
queueOptions: {
durable: false
},
},
}]),
CatalogModule
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
마찬가지로 catalog-service도 ecommerce_queue에 등록을 시키겠습니다.
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule, {
transport: Transport.RMQ,
options: {
urls: ['AMQP_URL'],
queue: 'ecommerce_queue',
queueOptions: {
durable: false,
}
}
}
);
await app.listen();
}
bootstrap();
이렇게 catalog-service도 ecommerce_queue에 대한 설정을 맞췄습니다. 그러면 앞서 #1번에서의 1) 경우에 따라서 코드를 구현해보도록 하겠습니다.
1-1) order.status의 값이 CREATE, REORDER로 변경됩니다.
import { HttpException, HttpStatus, Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { InjectRepository } from '@nestjs/typeorm';
import { OrderDto } from 'src/dto/order.dto';
import { OrderEntity } from 'src/entity/order.entity';
import { ResponseOrder } from 'src/vo/response.order';
import { Repository } from 'typeorm';
import { v4 as uuid } from 'uuid';
@Injectable()
export class OrderService {
constructor(
@InjectRepository(OrderEntity) private orderRepository: Repository<OrderEntity>,
@Inject('order-service') private readonly client: ClientProxy
) {}
public async create(orderDto: OrderDto): Promise<ResponseOrder> {
try {
const orderEntity = new OrderEntity();
orderEntity.orderId = uuid();
orderEntity.productId = orderDto.productId;
orderEntity.productName = orderDto.productName;
orderEntity.qty = orderDto.qty;
orderEntity.unitPrice = orderDto.unitPrice;
orderEntity.totalPrice = orderDto.totalPrice;
orderEntity.userId = orderDto.userId;
orderEntity.status = 'CREATE_ORDER';
this.client.emit('CREATE_ORDER', orderEntity);
await this.orderRepository.save(orderEntity);
const responseOrder = new ResponseOrder();
responseOrder.orderId = orderEntity.orderId;
responseOrder.productId = orderEntity.productId;
responseOrder.productName = orderEntity.productName;
responseOrder.qty = orderEntity.qty;
responseOrder.unitPrice = orderEntity.unitPrice;
responseOrder.totalPrice = orderEntity.totalPrice;
responseOrder.userId = orderEntity.userId;
responseOrder.status = orderEntity.status;
return responseOrder;
} catch(err) {
throw new HttpException(err, HttpStatus.BAD_REQUEST);
}
}
...
}
저는 컨트롤러 부분에서 비즈니스 로직을 처리하는 걸 좋아하지 않기 때문에 create메서드에서 service로 넘겨 rabbitMQ로 CREATE_ORDER메시지를 넘기도록 하겠습니다.
1-2) order-service의 db에는 qty의 값이 order에 맞게 저장됩니다.
요청이 잘 처리되고 order-service의 데이터베이스에 데이터가 잘 저장되었습니다.
1-3) 1-1)에는 qty의 값이 존재하므로 메시지 큐로 메시지를 전달합니다.
@EventPattern('CREATE_ORDER')
public async createOrderAndDecreaseStock(data: any): Promise<any> {
console.log(data);
return data;
}
CatalogController에서 메시지 큐에서 데이터를 받아오기 위한 createOrderAndDecreaseStock이라는 메서드를 만들었습니다. 우선 중간 결과를 확인하기 위해 데이터를 받아오고 콘솔에서 이 데이터를 확인하겠습니다.
데이터를 잘 받아오는 모습을 볼 수 있네요. 그러면 이 중간 결과를 바탕으로 order데이터를 지우고 1-4)를 진행하도록 하겠습니다.
@EventPattern('CREATE_ORDER')
public async createOrderAndDecreaseStock(data: any): Promise<any> {
return await this.catalogService.createOrderAndDecreaseStock(data);
}
1-4) catalog-service에서는 현재 stock에서 qty를 차감합니다.
import { HttpException, HttpStatus, Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { CatalogDto } from 'src/dto/catalog.dto';
import { CatalogEntity } from 'src/entity/catalog.entity';
import { ResponseCatalog } from 'src/vo/response.catalog';
import { Repository } from 'typeorm';
@Injectable()
export class CatalogService {
constructor(@InjectRepository(CatalogEntity) private catalogRepository: Repository<CatalogEntity>) {}
...
public async createOrderAndDecreaseStock(data: any): Promise<any> {
const catalogEntity = await this.catalogRepository.findOne({ where: { productId: data.productId }});
catalogEntity.stock -= data.qty;
await this.catalogRepository.save(catalogEntity);
return catalogEntity;
}
}
여기까지 stock에서 qty를 차감하는 부분까지 구현을 마치고 결과를 한번 확인해보겠습니다.
우선 productId가 product-001인 제품을 주문할 것이기 때문에 현재 catalog db에서 product-001인 제품의 현재고량을 보겠습니다.
stock을 보니 110으로 현재 재고량이 파악됩니다. 그러면 postman을 통해 주문을 진행해보겠습니다.
정상적으로 주문이 진행되었고, CREATE_ORDER라는 상태입니다.
데이터베이스에 주문이 잘 들어갔는지 확인하고, 가장 중요한 데이터 동기화가 잘 이루어졌는지 파악해보겠습니다.
order 데이터베이스에 주문이 정상적으로 들어갔음을 확인할 수 있고, qty의 양만큼 stock에서 차감이 되었음을 확인할 수 있습니다. 즉, 데이터 동기화가 정상적으로 이루어졌다는 의미입니다.