
이해하기 복잡해 보일 수 있지만, 간단하다.
주문을 생성하면, 해당 event를 exchange를 통해 product 큐로 메시지를 전송한다. 그리고 product service에서 payment 큐로 exchange를 통해 이벤트를 전달한다.
여기서 각 서비스에서 오류가 생기면 err exchange를 통해 error 큐로 전송해 각 서비스에서 해당되는 오류 처리 후, 반대 방향으로 넘겨주는 처리를 해주는 것이다.
현재는 실습 개념이라 생각보다는 간단한데, 실제 실무에서 사용하려고하면 서비스도 많고, 그에따른 큐도 무수하게 많아지고, 오류시 처리에 대한 부분도 쉽지 않을 것 같다.
우선 실습 코드를 확인하면서 설명하도록 하겠다.
spring.application.name=order
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment
message.err.exchange=market.err
message.queue.err.order=market.err.order
message.queue.err.product=market.err.product
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
간단하게 모든 서비스에서 위와 같은 properties 파일을 가지게 하고, 필요한 부분만 사용하게 했다.
@Configuration
public class OrderApplicationQueueConfig {
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Value("${message.exchange}")
private String exchange;
@Value("${message.queue.product}")
private String queueProduct;
@Value("${message.queue.payment}")
private String queuePayment;
@Value("${message.err.exchange}")
private String exchangeErr;
@Value("${message.queue.err.order}")
private String queueErrOrder;
@Value("${message.queue.err.product}")
private String queueErrProduct;
@Bean public TopicExchange exchange() { return new TopicExchange(exchange); }
@Bean public Queue queueProduct() { return new Queue(queueProduct); }
@Bean public Queue queuePayment() { return new Queue(queuePayment); }
@Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
@Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }
@Bean public TopicExchange exchangeErr() { return new TopicExchange(exchangeErr); }
@Bean public Queue queueErrOrder() { return new Queue(queueErrOrder); }
@Bean public Queue queueErrProduct() { return new Queue(queueErrProduct); }
@Bean public Binding bindingErrOrder() { return BindingBuilder.bind(queueErrOrder()).to(exchangeErr()).with(queueErrOrder); }
@Bean public Binding bindingErrProduct() { return BindingBuilder.bind(queueErrProduct()).to(exchangeErr()).with(queueErrProduct); }
}
대표적으로 order의 rabbitmq 설정 파일이다. Jackson2JsonMessageConverter 를 사용하고 있고, exchane, queue, binding 설정을 해주고있다.
@Value("${message.queue.product}")
private String productQueue;
private final RabbitTemplate rabbitTemplate;
private Map<UUID, Order> orderStore = new HashMap<>();
public Order createOrder(OrderEndpoint.OrderRequestDto orderRequestDto) {
Order order = orderRequestDto.toOrder();
DeliveryMessage deliveryMessage = orderRequestDto.toDeliveryMessage(order.getOrderId());
orderStore.put(order.getOrderId(), order);
log.info("send Message : {}",deliveryMessage.toString());
rabbitTemplate.convertAndSend(productQueue, deliveryMessage);
return order;
}
order 서비스에서는 간단하게 메모리의 주문 정보를 저장하고, product 큐로 메시지를 전달하는 과정을 진행하고 있다.
@Data
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class DeliveryMessage {
private UUID orderId;
private UUID paymentId;
private String userId;
private Integer productId;
private Integer productQuantity;
private Integer payAmount;
private String errorType;
}
메시지의 형식은 다음과 같고, 각 서버에서 동일하게 사용하고 있다.
public class ProductService {
private final RabbitTemplate rabbitTemplate;
@Value("${message.queue.payment}")
private String paymentQueue;
@Value("${message.queue.err.order}")
private String orderErrorQueue;
public void reduceProductAmount(DeliveryMessage deliveryMessage) {
Integer productId = deliveryMessage.getProductId();
Integer productQuantity = deliveryMessage.getProductQuantity();
if (productId != 1 || productQuantity > 1) {
this.rollbackProduct(deliveryMessage);
return;
}
rabbitTemplate.convertAndSend(paymentQueue,deliveryMessage);
}
public void rollbackProduct(DeliveryMessage deliveryMessage){
log.info("PRODUCT ROLLBACK!!!");
if(!StringUtils.hasText(deliveryMessage.getErrorType())){
deliveryMessage.setErrorType("PRODUCT ERROR");
}
rabbitTemplate.convertAndSend(orderErrorQueue, deliveryMessage);
}
}
product 서비스에서는 수량을 줄이고, payment 서비스로 전달하는 과정을 진행하고 있다. if 문 안에 조건은 예시이며, 만약 해당 조건을 만족하면 오류로 판단하고 order-error-queue로 메시지를 전달하고 있다.
product에서 받은 메시지에는 order로 부터 바로 전달받아 오류정보가 들어있을리 없기에 ErrorType을 PRODUCT ERROR로 설정해주고 있다. (간단화 하기위해 String 사용)
public class PaymentService {
private final RabbitTemplate rabbitTemplate;
@Value("${message.queue.err.product}")
private String productErrorQueue;
public void createPayment(DeliveryMessage deliveryMessage) {
Payment payment = Payment.builder()
.paymentId(UUID.randomUUID())
.userId(deliveryMessage.getUserId())
.payAmount(deliveryMessage.getPayAmount())
.payStatus("SUCCESS").build();
Integer payAmount = deliveryMessage.getPayAmount();
if (payAmount >= 10000) {
log.error("Payment amount exceeds limit: {}", payAmount);
payment.setPayStatus("CANCEL");
deliveryMessage.setErrorType("PAYMENT_LIMIT_EXCEEDED");
this.rollbackPayment(deliveryMessage);
}
}
public void rollbackPayment(DeliveryMessage deliveryMessage) {
log.info("PAYMENT ROLLBACK !!!");
rabbitTemplate.convertAndSend(productErrorQueue, deliveryMessage);
}
}
payment 서비스에서는 비용 정보를 생성하고 10000이 넘는 가격이라면 PAYMENT_LIMIT_EXCEEDED 에러로 설정하고, product-error-queue로 보내고 있다.
여기서 product-error-queue로 정보를 보내면
ProductController 에서
@RabbitListener(queues="${message.queue.err.product}")
public void receiveErrorMessage(DeliveryMessage deliveryMessage) {
log.info("ERROR RECEIVE !!!");
productService.rollbackProduct(deliveryMessage);
}
---
public void rollbackProduct(DeliveryMessage deliveryMessage){
log.info("PRODUCT ROLLBACK!!!");
if(!StringUtils.hasText(deliveryMessage.getErrorType())){
deliveryMessage.setErrorType("PRODUCT ERROR");
}
rabbitTemplate.convertAndSend(orderErrorQueue, deliveryMessage);
}
이처럼 잡아서 다시 order-error-queue로 던져준다.
그럼 OrderController에서
@RabbitListener(queues = "${message.queue.err.order}")
public void errOrder(DeliveryMessage message) {
log.info("ERROR RECEIVE !!!");
orderService.rollbackOrder(message);
}
---
public void rollbackOrder(DeliveryMessage message) {
Order order = orderStore.get(message.getOrderId());
order.cancelOrder(message.getErrorType());
log.info(order.toString());
}
다음처럼 최종 order에서 해당 주문을 취소처리하고, 해당 주문 취소의 이유를 메시지의 ErrorType으로 설정함으로써 해당 주문이 취소되었다는 내용을 프론트에게 보여주고, 주문을 새로 생성하라는 내용을 보여주면 될 것이다.
MSA 환경에서 분산 트랜잭션을 적용하는 방법 중에서 SAGA Pattern 을 적용해보았는데, 그중에서도 Message Queue인 RabbitMQ를 사용한 SAGA Pattern을 적용했다.
단지 3개의 서비스 뿐인데도, 큐가 4개가 만들어져야 하고 각 서비스별로 오류가 났을때의 처리와 오류 큐에 적재하는 과정까지 필요했기에 실제 업무에서 사용하려면 더 복잡해질 거 같다는 생각이 들었다.
하지만, 해당 패턴이 생긴 이유도 복잡도를 감수하고서라도 처리율을 높이기 위함이기 때문에 알아 놓는다면 나중에 적용시점에 되었을 때, 어렵지 않게 가져야 쓸 수 있을 것 같다.
https://github.com/kimgunwooo/MSA-practice/tree/main/saga-pattern