
3만명 서비스에서 재고 관리는 매우 중요한 문제입니다.
특히 다음과 같은 상황에서 문제가 발생할 수 있습니다.
해결 방안: 이벤트 기반 재고 관리 시스템
// 재고 이벤트 정의
@Getter
@AllArgsConstructor
public class InventoryEvent {
private String productId;
private int quantity;
private InventoryEventType eventType;
private String channelId;
private LocalDateTime timestamp;
public enum InventoryEventType {
STOCK_DECREASED,
STOCK_INCREASED,
STOCK_RESERVED,
RESERVATION_CANCELLED
}
}
// 재고 관리 서비스
@Service
@RequiredArgsConstructor
public class InventoryService {
private final KafkaTemplate<String, InventoryEvent> kafkaTemplate;
private final RedisTemplate<String, Integer> redisTemplate;
public void processInventoryEvent(InventoryEvent event) {
String productKey = "product:" + event.getProductId() + ":stock";
switch (event.getEventType()) {
case STOCK_DECREASED:
redisTemplate.opsForValue().increment(productKey, -event.getQuantity());
break;
case STOCK_INCREASED:
redisTemplate.opsForValue().increment(productKey, event.getQuantity());
break;
case STOCK_RESERVED:
String reservationKey = "reservation:" + event.getProductId() + ":" + event.getChannelId();
redisTemplate.opsForValue().set(reservationKey, event.getQuantity(), Duration.ofMinutes(10));
break;
// ... 기타 케이스 처리
}
// 이벤트 발행
kafkaTemplate.send("inventory-events", event.getProductId(), event);
}
}
// 재고 모니터링 컨슈머
@Component
@RequiredArgsConstructor
public class InventoryMonitor {
private final NotificationService notificationService;
@KafkaListener(topics = "inventory-events")
public void handleInventoryEvent(InventoryEvent event) {
if (event.getEventType() == InventoryEventType.STOCK_DECREASED) {
// 재고가 특정 임계치 이하로 떨어졌는지 체크
checkLowStockThreshold(event.getProductId(), event.getQuantity());
}
}
private void checkLowStockThreshold(String productId, int currentStock) {
if (currentStock < 10) {
notificationService.notifyLowStock(productId, currentStock);
}
}
}
온라인 쇼핑몰에서 주문 프로세스는 다음과 같은 여러 단계를 거칩니다
각 단계는 별도의 마이크로서비스로 구현되어 있으며, 전체 프로세스의 일관성을 유지해야 합니다.
// 주문 상태 이벤트
@Getter
@AllArgsConstructor
public class OrderEvent {
private String orderId;
private OrderStatus status;
private String failureReason;
private LocalDateTime timestamp;
public enum OrderStatus {
CREATED,
PAYMENT_COMPLETED,
PAYMENT_FAILED,
STOCK_CONFIRMED,
STOCK_FAILED,
DELIVERY_REQUESTED,
DELIVERY_FAILED,
COMPLETED,
CANCELLED
}
}
// 주문 사가 코디네이터
@Service
@RequiredArgsConstructor
public class OrderSagaCoordinator {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
private final OrderRepository orderRepository;
@Transactional
public void handleOrderEvent(OrderEvent event) {
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
switch (event.getStatus()) {
case PAYMENT_COMPLETED:
kafkaTemplate.send("stock-check", event);
break;
case PAYMENT_FAILED:
handleOrderFailure(order, "결제 실패: " + event.getFailureReason());
break;
case STOCK_CONFIRMED:
kafkaTemplate.send("delivery-request", event);
break;
case STOCK_FAILED:
handleOrderFailure(order, "재고 부족: " + event.getFailureReason());
// 결제 취소 보상 트랜잭션 실행
kafkaTemplate.send("payment-compensation", event);
break;
// ... 기타 상태 처리
}
order.setStatus(event.getStatus());
orderRepository.save(order);
}
private void handleOrderFailure(Order order, String reason) {
order.setStatus(OrderStatus.CANCELLED);
order.setFailureReason(reason);
orderRepository.save(order);
// 실패 알림 발송
kafkaTemplate.send("order-notification",
new OrderEvent(order.getId(), OrderStatus.CANCELLED, reason, LocalDateTime.now()));
}
}
서비스의 실시간 상태를 모니터링하고 이상 징후를 감지해야 하는 요구사항이 있습니다.
// 메트릭 이벤트 정의
@Getter
@AllArgsConstructor
public class MetricEvent {
private String serviceId;
private String metricName;
private double value;
private Map<String, String> tags;
private LocalDateTime timestamp;
}
// 메트릭 수집기
@Component
@RequiredArgsConstructor
public class MetricCollector {
private final KafkaTemplate<String, MetricEvent> kafkaTemplate;
@Around("@annotation(Monitored)")
public Object collectMetrics(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
String serviceName = joinPoint.getSignature().getDeclaringTypeName();
String methodName = joinPoint.getSignature().getName();
try {
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - startTime;
// 성공 메트릭 발행
publishMetric(serviceName, methodName, duration, "success");
return result;
} catch (Exception e) {
// 실패 메트릭 발행
publishMetric(serviceName, methodName, -1, "error");
throw e;
}
}
private void publishMetric(String service, String method, long duration, String status) {
Map<String, String> tags = new HashMap<>();
tags.put("method", method);
tags.put("status", status);
MetricEvent event = new MetricEvent(
service,
"method.duration",
duration,
tags,
LocalDateTime.now()
);
kafkaTemplate.send("metrics", event);
}
}
// 메트릭 분석기
@Component
@RequiredArgsConstructor
public class MetricAnalyzer {
private final AlertService alertService;
@KafkaListener(topics = "metrics")
public void analyzeMetrics(MetricEvent event) {
if ("method.duration".equals(event.getMetricName())) {
// 응답 시간 분석
analyzeResponseTime(event);
}
// ... 기타 메트릭 분석
}
private void analyzeResponseTime(MetricEvent event) {
if (event.getValue() > 1000) { // 1초 이상 소요
alertService.sendAlert(
String.format(
"성능 저하 감지: %s - %s (소요시간: %.2fms)",
event.getServiceId(),
event.getTags().get("method"),
event.getValue()
)
);
}
}
}
이러한 심화 구현을 통해 더 안정적이고 확장 가능한 시스템을 구축할 수 있습니다.
실제 프로덕션 환경에서는 위의 코드에 더해 테스트 코드, 예외 처리, 로깅, 모니터링 등이 추가로 구현되어야 합니다.