12/16

졸용·2025년 12월 16일

TIL

목록 보기
136/144

주문 취소 시 Kafka Skeleton Code

🔹 order-server

🔸 OrderStatusChangedV1 DTO 추가

public record OrderStatusChangedV1(
    UUID orderId,
    OrderStatus orderStatus,
    java.time.LocalDateTime changedAt
) {}

🔸 OrderCommandService

현재 구현된 코드를 기준으로 봤을 때, 이미

@Transactional
    public void deleteOrder(UUID id) {
}
  • 여기서 취소 + 재고복원 + soft delete까지 책임지고 있으니, 여기에 추가로 OrderStatusChangedV1를 같이 발행하도록 하면 흐름이 깔끔하다.

  • OrderStatusChangedV1 이벤트를 발행하는 위치는
    order.updateStatus(CANCELED) + order.delete() + 재고복원까지 다 성공한 다음, 마지막에 발행하는 것이 좋다.
    이유는 주문 취소가 확정일 때만 보상 처리하도록 하는 게 일관성에 좋기 때문이다.

  • 여기서 멱등성: deleteOrder()가 두 번 호출되면? 상황도 생각해봤을 때,
    Order.updateStatus()COMPLETED, CANCELED -> false라서
    이미 CANCELED면 INVALID_ORDER_STATUS_TRANSITION이 터질 것이다.

  • 이후에 트랜잭션 커밋 전에 이벤트가 나가면 생길 문제를 방지하기 위해,
    (Kafka 전송은 성공했는데 DB 커밋이 실패하면 delivery-server는 “취소됐다고 믿고 배송 취소”를 해버릴 수 있기 때문)
    @TransactionalEventListener(phase = AFTER_COMMIT)로 커밋 이후에 Kafka 발행하도록 처리방식으로 가벼운 고도화 또 가능. ➡️ 지금은 일단 보류

@Transactional
    public void deleteOrder(UUID id) {

        // 주문 조회
        Order order = readOrderOrThrow(id);

        // 담당 허브 소속 주문인지 체크
        if (SecurityUtils.hasRole(Role.HUB_MANAGER)) {
            List<UUID> managingHubId = hubPort.readHubId(SecurityUtils.getCurrentUserId());
            UUID receiverHubId = firmPort.readHubId(order.getReceiverFirmId());

            if (!managingHubId.contains(receiverHubId)) {
                throw new BusinessException(ErrorCode.ORDER_NOT_IN_MANAGING_HUB);
            }
        }

        // 본인 주문인지 체크
        if (SecurityUtils.hasRole(Role.FIRM_MANAGER)) {
            UUID currentUserId = SecurityUtils.getCurrentUserId();

            if (!currentUserId.equals(order.getCreatedBy())) {
                throw new BusinessException(ErrorCode.ORDER_NOT_CREATED_BY_USER);
            }
        }

        order.updateStatus(OrderStatus.CANCELED);
        order.delete(SecurityUtils.getCurrentUserId());

        // 재고 복원
        for (OrderProduct p : order.getOrderProductList()) {
            productPort.recoverStock(p.getProductId(), p.getQuantity());
        }

        // Kafka 메시지 발행
        OrderStatusChangedV1 message = new OrderStatusChangedV1(
            order.getId(),
            order.getOrderStatus(),  // CANCELED
            LocalDateTime.now()
        );

        eventPublisher.sendOrderStatusChanged(message);

        // TODO: 주문 읽기 업데이트 (OrderStatus, delete)
    }

🔸 EventPublisher

package chill_logistics.order_server.domain.event;

import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;

public interface EventPublisher {

    void sendOrderAfterCreate(OrderAfterCreateV1 message);

    /* 여기 추가 */
    void sendOrderStatusChanged(OrderStatusChangedV1 message);
}

🔸 OrderAfterCreateProducer

OrderAfterCreateProducer 클래스명 OrderEventPublisher로 수정하면 더 깔끔할 것 같다.

package chill_logistics.order_server.infrastructure.kafka;

import chill_logistics.order_server.domain.event.EventPublisher;
import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderAfterCreateProducer implements EventPublisher {

    private final KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate;
    private final KafkaTemplate<String, OrderStatusChangedV1> orderStatusChangedKafkaTemplate;

    @Value("${app.kafka.topic.order-after-create}")
    private String orderAfterCreateTopic;

    @Value("${app.kafka.topic.order-status-changed}")
    private String orderStatusChangedTopic;

    /**
     * 주문 생성 후 Kafka로 OrderAfterCreate 이벤트를 발행합니다.
     *
     * @param message 주문 생성 정보를 담고 있는 메시지 객체
     */
    @Override
    public void sendOrderAfterCreate(OrderAfterCreateV1 message) {

        String key = message.orderId().toString();

        log.info("[Kafka] OrderAfterCreate 메시지 발행, topic={}, key={}, message={}",
                orderAfterCreateTopic, key, message);

        orderAfterCreateKafkaTemplate
                .send(orderAfterCreateTopic, key, message)
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        log.error("[Kafka] OrderAfterCreate 메시지 전송 실패, key={}", key, ex);
                    } else {
                        log.info("[Kafka] OrderAfterCreate 메시지 전송 성공, orderId={}, offset={}",
                                key, result.getRecordMetadata().offset());
                    }
                });
    }

    /* 여기 추가 */
    
    /**
     * 주문 상태 변경(취소/실패/완료 등) 시 Kafka로 OrderStatusChanged 이벤트 발행
     */
    @Override
    public void sendOrderStatusChanged(OrderStatusChangedV1 message) {

        String key = message.orderId().toString();

        log.info("[Kafka] OrderStatusChanged 메시지 발행, topic={}, key={}, message={}",
            orderStatusChangedTopic, key, message);

        orderStatusChangedKafkaTemplate
            .send(orderStatusChangedTopic, key, message)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("[Kafka] OrderStatusChanged 메시지 전송 실패, key={}", key, ex);
                } else {
                    log.info("[Kafka] OrderStatusChanged 메시지 전송 성공, orderId={}, offset={}",
                        key, result.getRecordMetadata().offset());
                }
            });
    }
}

🔸 application.yml

app:
  kafka:
    topic:
      order-after-create: order-after-create
      /* 여기 추가 */
      order-status-changed: order-status-changed

🔸 KafkaProducerConfig

package chill_logistics.order_server.infrastructure.config;

import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
import chill_logistics.order_server.application.dto.command.OrderStatusChangedV1;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    /* 공통 Producer 설정 */
    private Map<String, Object> baseProducerProps() {
        Map<String, Object> props = new HashMap<>();

        // Kafka Broker
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // 직렬화 설정
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return props;
    }

    /* OrderAfterCreate */
    @Bean
    public ProducerFactory<String, OrderAfterCreateV1> orderAfterCreateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(baseProducerProps());
    }

    @Bean
    public KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate() {
        return new KafkaTemplate<>(orderAfterCreateProducerFactory());
    }

    /* OrderStatusChanged */
    @Bean
    public ProducerFactory<String, OrderStatusChangedV1> orderStatusChangedProducerFactory() {
        return new DefaultKafkaProducerFactory<>(baseProducerProps());
    }

    @Bean
    public KafkaTemplate<String, OrderStatusChangedV1> orderStatusChangedKafkaTemplate() {
        return new KafkaTemplate<>(orderStatusChangedProducerFactory());
    }
}


🔹 delivery-server

  • HubRouteAfterCreate 처리(배송 생성) 쪽에서 “이미 취소된 주문이면 생성하지 않기” 같은 가드 로직 필요

  • 멱등성: orderId 기준으로 이미 배송 만들어졌으면, 다시 생성하지 않도록 처리 필요
    (같은 주문으로 배송이 중복 생성되지 않도록)

🔸 HubDelievery

package chill_logistics.delivery_server.domain.entity;

import ...

    ...(생략)

    // length=15 옵션 제거
    @Enumerated(EnumType.STRING)
    @Column(name = "delivery_status", nullable = false)
    private DeliveryStatus deliveryStatus;
    
    ...(생략)
    
    ...(기존 메서드 생략)

    // 주문 취소 시 처리 메서드
    public void cancelDueToOrder() {

        // 멱등성: 이미 취소된 상태면 다시 호출되어도 OK || 이미 상품 받았으면 주문 취소 불가
        if (this.deliveryStatus == DeliveryStatus.DELIVERY_CANCELLED
            || this.deliveryStatus == DeliveryStatus.DELIVERY_COMPLETED) {
            return;
        }

        DeliveryStatus nextDeliveryStatus = DeliveryStatus.DELIVERY_CANCELLED;

        if (!this.deliveryStatus.canTransitTo(nextDeliveryStatus)) {
            throw new BusinessException(ErrorCode.DELIVERY_ALREADY_COMPLETED_OR_CANCELED);
        }

        this.deliveryStatus = nextDeliveryStatus;
    }
}

🔸 FirmDelivery

package chill_logistics.delivery_server.domain.entity;

import ...

...(생략)

	// length=15 옵션 제거
    @Enumerated(EnumType.STRING)
    @Column(name = "delivery_status", nullable = false)
    private DeliveryStatus deliveryStatus;
    
    ...(생략)

	... (기존 메서드 생략)

    // 주문 취소 시 처리 메서드
    public void cancelDueToOrder() {

        // 멱등성: 이미 취소된 상태면 다시 호출되어도 OK || 이미 상품 받았으면 주문 취소 불가
        if (this.deliveryStatus == DeliveryStatus.DELIVERY_CANCELLED
            || this.deliveryStatus == DeliveryStatus.DELIVERY_COMPLETED) {
            return;
        }

        DeliveryStatus nextDeliveryStatus = DeliveryStatus.DELIVERY_CANCELLED;

        if (!this.deliveryStatus.canTransitTo(nextDeliveryStatus)) {
            throw new BusinessException(ErrorCode.DELIVERY_ALREADY_COMPLETED_OR_CANCELED);
        }

        this.deliveryStatus = nextDeliveryStatus;
    }
}

🔸 OrderStatus 추가

package chill_logistics.delivery_server.application;

public enum OrderStatus {

    CREATED,
    PROCESSING,
    IN_TRANSIT,
    COMPLETED,
    CANCELED
}

🔸 OrderStatusChangedV1 추가

package chill_logistics.delivery_server.infrastructure.kafka.dto;

import chill_logistics.delivery_server.application.OrderStatus;
import java.time.LocalDateTime;
import java.util.UUID;

public record OrderStatusChangedV1(
    UUID orderId,
    OrderStatus orderStatus,
    String reason,
    LocalDateTime changedAt
) {}

🔸 KafkaConsumerConfig

  • 공통 설정 분리 후 HubRouteAfterCreate / OrderStatusChanged 각각 container 설정
package chill_logistics.delivery_server.infrastructure.config;

import chill_logistics.delivery_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderStatusChangedV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConsumerConfig {

    /* 공통 Consumer 설정 */
    private Map<String, Object> baseConsumerProps(String groupId) {

        Map<String, Object> props = new HashMap<>();

        // Kafka Broker
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Consumer Group ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Key 역직렬화
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // Value 역직렬화
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // 처음 실행 시 offset 정책 (earliest: consumer group이 이 토픽의 이 파티션을 지금 들어오는 새 메시지부터 읽는다)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return props;
    }

    /* HubRouteAfterCreate */
    @Bean
    public ConsumerFactory<String, HubRouteAfterCreateV1> hubConsumerFactory() {

        // JSON → HubRouteAfterCreateV1 역직렬화를 위한 Deserializer
        JsonDeserializer<HubRouteAfterCreateV1> deserializer =
            new JsonDeserializer<>(HubRouteAfterCreateV1.class, false);

        // Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
        deserializer.addTrustedPackages(
            "chill_logistics.delivery_server.infrastructure.kafka.dto"
        );

        // 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
        return new DefaultKafkaConsumerFactory<>(
            baseConsumerProps("delivery-server-hub-route-group"),
            new StringDeserializer(),   // Key Deserializer (String)
            deserializer                // Value Deserializer (HubRouteAfterCreateV1)
        );
    }

    /* HubRouteAfterCreate */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1>
    // @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
    hubKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        // ConsumerFactory 설정
        factory.setConsumerFactory(hubConsumerFactory());

        return factory;
    }

    /* OrderStatusChanged */
    @Bean
    public ConsumerFactory<String, OrderStatusChangedV1> orderStatusChangedConsumerFactory() {

        JsonDeserializer<OrderStatusChangedV1> deserializer =
            new JsonDeserializer<>(OrderStatusChangedV1.class, false);

        deserializer.addTrustedPackages(
            "chill_logistics.delivery_server.infrastructure.kafka.dto",
            "chill_logistics.delivery_server.application"  // OrderStatus ENUM 역직렬화용
        );

        return new DefaultKafkaConsumerFactory<>(
            baseConsumerProps("delivery-server-order-status-group"),
            new StringDeserializer(),
            deserializer
        );
    }

    /* OrderStatusChanged */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderStatusChangedV1>
    orderStatusChangedKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderStatusChangedV1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(orderStatusChangedConsumerFactory());

        return factory;
    }
}

🔸 OrderStatusChangedListener 추가

package chill_logistics.delivery_server.infrastructure.kafka;

import chill_logistics.delivery_server.application.OrderStatus;
import chill_logistics.delivery_server.application.service.OrderCancellationService;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderStatusChangedV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusChangedListener {

    private final OrderCancellationService orderCancellationService;

    @KafkaListener(
        topics = "order-status-changed",
        groupId = "delivery-server-order-status-group",
        containerFactory = "orderStatusChangedKafkaListenerContainerFactory"
    )
    public void listen(OrderStatusChangedV1 message) {

        log.info("Kafka 메시지 수신: {}", message);

        if (message.orderStatus() == OrderStatus.CANCELED) {
            orderCancellationService.cancelDeliveriesByOrder(
                message.orderId()
            );
        }
    }
}

🔸 OrderCancellationService 추가

package chill_logistics.delivery_server.application.service;

import chill_logistics.delivery_server.domain.entity.FirmDelivery;
import chill_logistics.delivery_server.domain.entity.HubDelivery;
import chill_logistics.delivery_server.domain.repository.FirmDeliveryRepository;
import chill_logistics.delivery_server.domain.repository.HubDeliveryRepository;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderCancellationService {

    private final HubDeliveryRepository hubDeliveryRepository;
    private final FirmDeliveryRepository firmDeliveryRepository;

    /* [주문 취소/실패 시 배송 취소 메서드]
     * orderId 기준 HubDelivery / FirmDelivery 취소 처리
     */
    @Transactional
    public void cancelDeliveriesByOrder(UUID orderId) {

        List<HubDelivery> hubDeliveryList = hubDeliveryRepository.findByOrderId(orderId);
        List<FirmDelivery> firmDeliveryList = firmDeliveryRepository.findByOrderId(orderId);

        if (hubDeliveryList.isEmpty() && firmDeliveryList.isEmpty()) {
            log.info("[주문 취소에 따른 배송 취소] 대상 배송이 없습니다. orderId={}", orderId);

            return;
        }

        for (HubDelivery hubDelivery : hubDeliveryList) {
            hubDelivery.cancelDueToOrder();
        }

        for (FirmDelivery firmDelivery : firmDeliveryList) {
            firmDelivery.cancelDueToOrder();
        }

        log.info("[주문 취소에 따른 배송 취소] 처리 완료. orderId={}", orderId);
    }
}
profile
꾸준한 공부만이 답이다

0개의 댓글