12/22

졸용·2025년 12월 22일

TIL

목록 보기
140/144

🔹 Outbox 패턴 적용 리팩터링

🔸 OrderOutboxEvent

package chill_logistics.order_server.domain.entity;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EntityListeners;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.UUID;
import lib.entity.BaseEntity;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;

@Getter
@Entity
@Table(name = "p_order_outbox_event")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EntityListeners(AuditingEntityListener.class)
public class OrderOutboxEvent extends BaseEntity {

    @Id
    @GeneratedValue(generator = "uuidv7")
    @GenericGenerator(
        name = "uuidv7",
        strategy = "lib.id.UUIDv7Generator"
    )
    @Column(name = "id", columnDefinition = "BINARY(16)")
    private UUID id;

    @Column(name = "order_id", nullable = false, columnDefinition = "BINARY(16)")
    private UUID orderId;

    @Column(name = "event_type", nullable = false, length = 100)
    private String eventType;

    @Column(name = "payload", nullable = false, columnDefinition = "TEXT")
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(name = "order_outbox_status", nullable = false, length = 30)
    private OrderOutboxStatus orderOutboxStatus;

    @Column(name = "retry_count", nullable = false)
    private int retryCount;

    @Column(name = "published_at")
    private LocalDateTime publishedAt;

    @Column(name = "last_retry_at")
    private LocalDateTime lastRetryAt;

    private static final int MAX_RETRY_COUNT = 3;
    private static final long MAX_BACKOFF_MILLIS = 100L;

    private OrderOutboxEvent(UUID orderId, String eventType, String payload) {
        this.orderId = orderId;
        this.eventType = eventType;
        this.payload = payload;
        this.orderOutboxStatus = OrderOutboxStatus.PENDING;
        this.retryCount = 0;
    }

    public static OrderOutboxEvent create(UUID orderId, String eventType, String payload) {
        return new OrderOutboxEvent(orderId, eventType, payload);
    }

    /* 이미 처리 되었는지 */
    public boolean alreadyHandled() {
        return this.orderOutboxStatus == OrderOutboxStatus.PUBLISHED
            || this.orderOutboxStatus == OrderOutboxStatus.FAILED;
    }

    /* 재시도 해도 되는지 */
    public boolean canRetry() {
        return this.orderOutboxStatus == OrderOutboxStatus.PENDING
            && this.retryCount < MAX_RETRY_COUNT;
    }

    /* 지금 재시도 할 준비 되었는지 */
    public boolean readyToRetry() {

        if (!canRetry()) {
            return false;
        }

        // 첫 시도는 즉시 성공
        if (this.lastRetryAt == null) {
            return true;
        }

        long timeSinceLastRetryMillis =
            Duration.between(this.lastRetryAt, LocalDateTime.now()).toMillis();

        return timeSinceLastRetryMillis >= MAX_BACKOFF_MILLIS;
    }

    /* 성공 확정 */
    public void markPublished() {
        this.orderOutboxStatus = OrderOutboxStatus.PUBLISHED;
        this.publishedAt = LocalDateTime.now();
    }

    /* 실패 처리 */
    public void markFailed() {
        this.retryCount++;
        this.lastRetryAt = LocalDateTime.now();

        // 최대 초과 시 FAILED
        if (this.retryCount >= MAX_RETRY_COUNT) {
            this.orderOutboxStatus = OrderOutboxStatus.FAILED;
        }
    }
}

🔸 KafkaProducerConfig

package chill_logistics.order_server.infrastructure.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> outboxProducerFactory() {

        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Outbox 패턴에서 권장되는 설정
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> outboxKafkaTemplate() {
        return new KafkaTemplate<>(outboxProducerFactory());
    }
}

🔸 OrderOutboxKafkaProducer

package chill_logistics.order_server.infrastructure.kafka.outbox;

import chill_logistics.order_server.domain.event.OutboxEventProducer;
import chill_logistics.order_server.infrastructure.kafka.KafkaPassportProducerSupport;
import chill_logistics.order_server.lib.error.ErrorCode;
import java.util.concurrent.CompletableFuture;
import lib.passport.PassportIssuer;
import lib.web.error.BusinessException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderOutboxKafkaProducer implements OutboxEventProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final PassportIssuer passportIssuer;

    @Value("${app.kafka.topic.order-after-create}")
    private String orderAfterCreateTopic;

    @Value("${app.kafka.topic.order-canceled}")
    private String orderCanceledTopic;

    @Override
    public CompletableFuture<SendResult<String, String>> publish(String eventType, String key, String payload) {

        String topic = resolveTopic(eventType);

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, payload);

        // ✅ passport/user/trace 헤더 삽입
        KafkaPassportProducerSupport.writeHeaders(record.headers(), passportIssuer);

        return kafkaTemplate.send(record);
    }

    private String resolveTopic(String eventType) {

        return switch (eventType) {
            case "OrderAfterCreateV1" -> orderAfterCreateTopic;
            case "OrderCanceledV1" -> orderCanceledTopic;
            default -> throw new BusinessException(ErrorCode.UNKNOWN_EVENT_TYPE);
        };
    }
}

🔸 OrderOutboxProcessor

package chill_logistics.order_server.infrastructure.kafka.outbox;

import chill_logistics.order_server.domain.entity.OrderOutboxEvent;
import chill_logistics.order_server.domain.entity.OrderOutboxStatus;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderOutboxProcessor {

    private static final int BATCH_SIZE = 200;                 // 한 번의 process 에 처리할 이벤트 건 수
    private static final long FIXED_DELAY_MS = 500;            // Processor 스케줄러 실행 주기

    private final OrderOutboxEventRepository outboxEventRepository;
    private final OutboxEventTransactionManager transactionManager;

    @Scheduled(fixedDelay = FIXED_DELAY_MS)
    public void publishPendingEvents() {

        // 상태가 PENDING인 이벤트만 조회 & createdAt 기준 오래된 이벤트부터 처리
        List<OrderOutboxEvent> eventList = outboxEventRepository.findPendingEvents(
            OrderOutboxStatus.PENDING,
            PageRequest.of(0, BATCH_SIZE)
        );

        if (eventList.isEmpty()) {
            return;
        }

        int processed = 0;
        int skipped = 0;

        for (OrderOutboxEvent event : eventList) {

            // 이미 처리되었거나, 재시도 조건 충족 안 되면 스킵
            if (event.alreadyHandled() || !event.readyToRetry()) {
                skipped++;
                continue;
            }

            // 이벤트 1건 → REQUIRES_NEW 트랜잭션에서 처리
            transactionManager.publishEvent(event.getId());
            processed++;
        }

        log.info("[OUTBOX 처리 완료] 조회건수={} 처리건수={} 스킵건수={}",
            eventList.size(), processed, skipped);
    }
}

🔸 OutboxEventTransactionManager

package chill_logistics.order_server.infrastructure.kafka.outbox;

import chill_logistics.order_server.domain.entity.OrderOutboxEvent;
import chill_logistics.order_server.domain.event.OutboxEventProducer;
import chill_logistics.order_server.lib.error.ErrorCode;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lib.web.error.BusinessException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class OutboxEventTransactionManager {

    private static final long SEND_CONFIRM_TIMEOUT_MS = 3000;  // Kafka 전송 요청 보낸 뒤, 성공 응답 기다리는 최대 시간

    private final OrderOutboxEventRepository outboxEventRepository;
    private final OutboxEventProducer outboxEventProducer;

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void publishEvent(UUID outboxId) {

        OrderOutboxEvent event = outboxEventRepository.findById(outboxId)
            .orElseThrow(() -> new BusinessException(ErrorCode.ORDER_OUTBOX_EVENT_NOT_FOUND));

        // 이미 처리되었거나, 재시도 조건 충족 안 되면 종료
        if (event.alreadyHandled() || !event.readyToRetry()) {
            return;
        }

        String key = event.getOrderId().toString();

        try {
            // Kafka 전송 + 성공 확정 대기
            outboxEventProducer
                .publish(event.getEventType(), key, event.getPayload())
                .get(SEND_CONFIRM_TIMEOUT_MS, TimeUnit.MILLISECONDS);

            // 전송 성공 처리
            event.markPublished();

            log.info("[OUTBOX 이벤트 발행 성공] outboxId={} eventType={} orderId={}",
                event.getId(), event.getEventType(), event.getOrderId());

        } catch (Exception ex) {
            // 전송 실패 처리 (retryCount 증가)
            event.markFailed();

            log.warn("[OUTBOX 이벤트 발행 실패] outboxId={} 재시도 횟수={}",
                event.getId(), event.getRetryCount(), ex);
        }
    }
}

🔸 OrderCommandService

package chill_logistics.order_server.application.service;

import chill_logistics.order_server.application.dto.command.CreateOrderCommandV1;
import chill_logistics.order_server.application.dto.command.CreateOrderResultV1;
import chill_logistics.order_server.application.dto.command.FirmInfoV1;
import chill_logistics.order_server.application.dto.command.FirmResultV1;
import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
import chill_logistics.order_server.application.dto.command.OrderCanceledV1;
import chill_logistics.order_server.application.dto.command.OrderProductInfoV1;
import chill_logistics.order_server.application.dto.command.ProductResultV1;
import chill_logistics.order_server.application.dto.command.ReceiverInfoV1;
import chill_logistics.order_server.application.dto.command.SupplierInfoV1;
import chill_logistics.order_server.application.dto.command.UpdateOrderStatusCommandV1;
import chill_logistics.order_server.domain.entity.Order;
import chill_logistics.order_server.domain.entity.OrderOutboxEvent;
import chill_logistics.order_server.domain.entity.OrderProduct;
import chill_logistics.order_server.domain.entity.OrderQuery;
import chill_logistics.order_server.domain.entity.OrderStatus;
import chill_logistics.order_server.domain.port.FirmPort;
import chill_logistics.order_server.domain.port.HubPort;
import chill_logistics.order_server.domain.port.ProductPort;
import chill_logistics.order_server.domain.repository.OrderQueryRepository;
import chill_logistics.order_server.domain.repository.OrderRepository;
import chill_logistics.order_server.lib.error.ErrorCode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import lib.entity.Role;
import lib.util.SecurityUtils;
import lib.web.error.BusinessException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private static final String EVENT_ORDER_AFTER_CREATE = "OrderAfterCreateV1";
    private static final String EVENT_ORDER_CANCELED = "OrderCanceledV1";

    private final OrderRepository orderRepository;
    private final OrderQueryRepository orderQueryRepository;
    private final ProductPort productPort;
    private final HubPort hubPort;
    private final FirmPort firmPort;
    private final OrderOutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;

    private Order readOrderOrThrow(UUID orderId) {
        return orderRepository.findById(orderId)
                .orElseThrow(() -> new BusinessException(ErrorCode.ORDER_NOT_FOUND));
    }

    // Outbox 이벤트 적재
    private void saveOutboxEvent(UUID orderId, String eventType, Object message) {

        final String payload;
        try {
            payload = objectMapper.writeValueAsString(message);

        } catch (JsonProcessingException e) {
            // Outbox payload 직렬화 실패는 주문 트랜잭션 자체를 실패시키는 게 일반적으로 안전
            log.error("[OUTBOX payload 직렬화 실패] orderId={} eventType={}", orderId, eventType, e);

            throw new BusinessException(ErrorCode.OUTBOX_PAYLOAD_SERIALIZATION_FAILED);
        }

        OrderOutboxEvent outboxEvent = OrderOutboxEvent.create(orderId, eventType, payload);

        outboxEventRepository.save(outboxEvent);

        log.info("[OUTBOX 이벤트 적재 완료] orderId={} eventType={} outboxId={}",
            orderId, eventType, outboxEvent.getId());
    }

    @Transactional
    public CreateOrderResultV1 createOrder(CreateOrderCommandV1 command) {

        // 업체 조회
        FirmResultV1 supplierResult = firmPort.readFirmById(command.supplierFirmId(), "SUPPLIER");
        FirmResultV1 receiverResult = firmPort.readFirmById(command.receiverFirmId(), "RECEIVER");

        // 주문 상품 체크 및 재고 감소
        List<OrderProductInfoV1> orderProductInfoList =
                command.productList()
                        .stream()
                        .map(p -> {
                            // 상품 조회
                            ProductResultV1 product = productPort.readProductById(p.productId());

                            // 공급 업체 소속 상품인지 체크
                            if (!product.firmId().equals(command.supplierFirmId())) {
                                throw new BusinessException(ErrorCode.PRODUCT_NOT_FROM_FIRM);
                            }

                            // 상품 재고 체크
                            if (product.stockQuantity() < p.quantity()) {
                                throw new BusinessException(ErrorCode.OUT_OF_STOCK);
                            }

                            // 상품 재고 감소
                            productPort.decreaseStock(p.productId(), p.quantity());

                            return new OrderProductInfoV1(
                                    p.productId(),
                                    product.name(),
                                    product.price(),
                                    p.quantity()
                            );
                        })
                        .toList();

        // 주문 생성
        Order order = Order.create(
                command.supplierFirmId(),
                command.receiverFirmId(),
                command.requestNote(),
                orderProductInfoList
        );

        Order createOrder = orderRepository.save(order);

        // 주문 생성 시 주문 읽기 생성
        // TODO: 추후 주문 읽기 전략 수정예정 (임시: 대표 상품)
        OrderQuery orderQuery = OrderQuery.create(
                createOrder,
                SupplierInfoV1.from(supplierResult),
                ReceiverInfoV1.from(receiverResult)
        );

        orderQueryRepository.save(orderQuery);

        // Kafka 메시지 생성 (즉시 발행하지 않음)
        OrderAfterCreateV1 message = new OrderAfterCreateV1(
                createOrder.getId(),
                supplierResult.hubId(),
                receiverResult.hubId(),
                createOrder.getReceiverFirmId(),
                receiverResult.firmFullAddress(),
                receiverResult.firmOwnerName(),
                createOrder.getRequestNote(),
                // TODO: 추후 주문 읽기 전략 수정예정 (임시: 대표 상품)
                createOrder.getOrderProductList().get(0).getProductName(),
                createOrder.getOrderProductList().get(0).getQuantity(),
                createOrder.getCreatedAt()
        );

        // Outbox 적재
        saveOutboxEvent(createOrder.getId(), EVENT_ORDER_AFTER_CREATE, message);

        return CreateOrderResultV1.from(
                createOrder,
                FirmInfoV1.from(supplierResult),
                FirmInfoV1.from(receiverResult)
        );
    }

    @Transactional
    public void updateOrderStatus(UUID id, UpdateOrderStatusCommandV1 command) {

        // 주문 조회
        Order order = readOrderOrThrow(id);

        // 담당 허브 소속 주문인지 체크
        if (SecurityUtils.hasRole(Role.HUB_MANAGER)) {
            List<UUID> managingHubId = hubPort.readHubId(SecurityUtils.getCurrentUserId());
            UUID receiverHubId = firmPort.readHubId(order.getReceiverFirmId());

            if (!managingHubId.contains(receiverHubId)) {
                throw new BusinessException(ErrorCode.ORDER_NOT_IN_MANAGING_HUB);
            }
        }

        // 상태 변경
        order.updateStatus(command.status());

        // TODO: 주문 읽기 업데이트 (OrderStatus)
    }

    @Transactional
    public void deleteOrder(UUID id) {

        // 주문 조회
        Order order = readOrderOrThrow(id);

        // 담당 허브 소속 주문인지 체크
        if (SecurityUtils.hasRole(Role.HUB_MANAGER)) {
            List<UUID> managingHubId = hubPort.readHubId(SecurityUtils.getCurrentUserId());
            UUID receiverHubId = firmPort.readHubId(order.getReceiverFirmId());

            if (!managingHubId.contains(receiverHubId)) {
                throw new BusinessException(ErrorCode.ORDER_NOT_IN_MANAGING_HUB);
            }
        }

        // 본인 주문인지 체크
        if (SecurityUtils.hasRole(Role.FIRM_MANAGER)) {
            UUID currentUserId = SecurityUtils.getCurrentUserId();

            if (!currentUserId.equals(order.getCreatedBy())) {
                throw new BusinessException(ErrorCode.ORDER_NOT_CREATED_BY_USER);
            }
        }

        order.updateStatus(OrderStatus.CANCELED);
        order.delete(SecurityUtils.getCurrentUserId());

        // 재고 복원
        for (OrderProduct p : order.getOrderProductList()) {
            productPort.recoverStock(p.getProductId(), p.getQuantity());
        }

        // Kafka 메시지 생성 (즉시 발행하지 않음)
        OrderCanceledV1 message = new OrderCanceledV1(
            order.getId(),
            order.getOrderStatus(),  // CANCELED
            LocalDateTime.now()
        );

        saveOutboxEvent(order.getId(), EVENT_ORDER_CANCELED, message);

        // TODO: 주문 읽기 업데이트 (OrderStatus, delete)
    }
}
profile
꾸준한 공부만이 답이다

0개의 댓글