public record OrderStatusChangedV1(
UUID orderId,
OrderStatus orderStatus,
java.time.LocalDateTime changedAt
) {}
현재 구현된 코드를 기준으로 봤을 때, 이미
@Transactional
public void deleteOrder(UUID id) {
}
여기서 취소 + 재고복원 + soft delete까지 책임지고 있으니, 여기에 추가로 OrderStatusChangedV1를 같이 발행하도록 하면 흐름이 깔끔하다.
OrderStatusChangedV1 이벤트를 발행하는 위치는
order.updateStatus(CANCELED) + order.delete() + 재고복원까지 다 성공한 다음, 마지막에 발행하는 것이 좋다.
이유는 주문 취소가 확정일 때만 보상 처리하도록 하는 게 일관성에 좋기 때문이다.
여기서 멱등성: deleteOrder()가 두 번 호출되면? 상황도 생각해봤을 때,
Order.updateStatus()는 COMPLETED, CANCELED -> false라서
이미 CANCELED면 INVALID_ORDER_STATUS_TRANSITION이 터질 것이다.
이후에 트랜잭션 커밋 전에 이벤트가 나가면 생길 문제를 방지하기 위해,
(Kafka 전송은 성공했는데 DB 커밋이 실패하면 delivery-server는 “취소됐다고 믿고 배송 취소”를 해버릴 수 있기 때문)
@TransactionalEventListener(phase = AFTER_COMMIT)로 커밋 이후에 Kafka 발행하도록 처리방식으로 가벼운 고도화 또 가능. ➡️ 지금은 일단 보류
@Transactional
public void deleteOrder(UUID id) {
// 주문 조회
Order order = readOrderOrThrow(id);
// 담당 허브 소속 주문인지 체크
if (SecurityUtils.hasRole(Role.HUB_MANAGER)) {
List<UUID> managingHubId = hubPort.readHubId(SecurityUtils.getCurrentUserId());
UUID receiverHubId = firmPort.readHubId(order.getReceiverFirmId());
if (!managingHubId.contains(receiverHubId)) {
throw new BusinessException(ErrorCode.ORDER_NOT_IN_MANAGING_HUB);
}
}
// 본인 주문인지 체크
if (SecurityUtils.hasRole(Role.FIRM_MANAGER)) {
UUID currentUserId = SecurityUtils.getCurrentUserId();
if (!currentUserId.equals(order.getCreatedBy())) {
throw new BusinessException(ErrorCode.ORDER_NOT_CREATED_BY_USER);
}
}
order.updateStatus(OrderStatus.CANCELED);
order.delete(SecurityUtils.getCurrentUserId());
// 재고 복원
for (OrderProduct p : order.getOrderProductList()) {
productPort.recoverStock(p.getProductId(), p.getQuantity());
}
// Kafka 메시지 발행
OrderStatusChangedV1 message = new OrderStatusChangedV1(
order.getId(),
order.getOrderStatus(), // CANCELED
LocalDateTime.now()
);
eventPublisher.sendOrderStatusChanged(message);
// TODO: 주문 읽기 업데이트 (OrderStatus, delete)
}
package chill_logistics.order_server.domain.event;
import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
public interface EventPublisher {
void sendOrderAfterCreate(OrderAfterCreateV1 message);
/* 여기 추가 */
void sendOrderStatusChanged(OrderStatusChangedV1 message);
}
OrderAfterCreateProducer 클래스명 OrderEventPublisher로 수정하면 더 깔끔할 것 같다.
package chill_logistics.order_server.infrastructure.kafka;
import chill_logistics.order_server.domain.event.EventPublisher;
import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderAfterCreateProducer implements EventPublisher {
private final KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate;
private final KafkaTemplate<String, OrderStatusChangedV1> orderStatusChangedKafkaTemplate;
@Value("${app.kafka.topic.order-after-create}")
private String orderAfterCreateTopic;
@Value("${app.kafka.topic.order-status-changed}")
private String orderStatusChangedTopic;
/**
* 주문 생성 후 Kafka로 OrderAfterCreate 이벤트를 발행합니다.
*
* @param message 주문 생성 정보를 담고 있는 메시지 객체
*/
@Override
public void sendOrderAfterCreate(OrderAfterCreateV1 message) {
String key = message.orderId().toString();
log.info("[Kafka] OrderAfterCreate 메시지 발행, topic={}, key={}, message={}",
orderAfterCreateTopic, key, message);
orderAfterCreateKafkaTemplate
.send(orderAfterCreateTopic, key, message)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[Kafka] OrderAfterCreate 메시지 전송 실패, key={}", key, ex);
} else {
log.info("[Kafka] OrderAfterCreate 메시지 전송 성공, orderId={}, offset={}",
key, result.getRecordMetadata().offset());
}
});
}
/* 여기 추가 */
/**
* 주문 상태 변경(취소/실패/완료 등) 시 Kafka로 OrderStatusChanged 이벤트 발행
*/
@Override
public void sendOrderStatusChanged(OrderStatusChangedV1 message) {
String key = message.orderId().toString();
log.info("[Kafka] OrderStatusChanged 메시지 발행, topic={}, key={}, message={}",
orderStatusChangedTopic, key, message);
orderStatusChangedKafkaTemplate
.send(orderStatusChangedTopic, key, message)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[Kafka] OrderStatusChanged 메시지 전송 실패, key={}", key, ex);
} else {
log.info("[Kafka] OrderStatusChanged 메시지 전송 성공, orderId={}, offset={}",
key, result.getRecordMetadata().offset());
}
});
}
}
app:
kafka:
topic:
order-after-create: order-after-create
/* 여기 추가 */
order-status-changed: order-status-changed
package chill_logistics.order_server.infrastructure.config;
import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
import chill_logistics.order_server.application.dto.command.OrderStatusChangedV1;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
/* 공통 Producer 설정 */
private Map<String, Object> baseProducerProps() {
Map<String, Object> props = new HashMap<>();
// Kafka Broker
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 직렬화 설정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
/* OrderAfterCreate */
@Bean
public ProducerFactory<String, OrderAfterCreateV1> orderAfterCreateProducerFactory() {
return new DefaultKafkaProducerFactory<>(baseProducerProps());
}
@Bean
public KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate() {
return new KafkaTemplate<>(orderAfterCreateProducerFactory());
}
/* OrderStatusChanged */
@Bean
public ProducerFactory<String, OrderStatusChangedV1> orderStatusChangedProducerFactory() {
return new DefaultKafkaProducerFactory<>(baseProducerProps());
}
@Bean
public KafkaTemplate<String, OrderStatusChangedV1> orderStatusChangedKafkaTemplate() {
return new KafkaTemplate<>(orderStatusChangedProducerFactory());
}
}
HubRouteAfterCreate 처리(배송 생성) 쪽에서 “이미 취소된 주문이면 생성하지 않기” 같은 가드 로직 필요
멱등성: orderId 기준으로 이미 배송 만들어졌으면, 다시 생성하지 않도록 처리 필요
(같은 주문으로 배송이 중복 생성되지 않도록)
package chill_logistics.delivery_server.domain.entity;
import ...
...(생략)
// length=15 옵션 제거
@Enumerated(EnumType.STRING)
@Column(name = "delivery_status", nullable = false)
private DeliveryStatus deliveryStatus;
...(생략)
...(기존 메서드 생략)
// 주문 취소 시 처리 메서드
public void cancelDueToOrder() {
// 멱등성: 이미 취소된 상태면 다시 호출되어도 OK || 이미 상품 받았으면 주문 취소 불가
if (this.deliveryStatus == DeliveryStatus.DELIVERY_CANCELLED
|| this.deliveryStatus == DeliveryStatus.DELIVERY_COMPLETED) {
return;
}
DeliveryStatus nextDeliveryStatus = DeliveryStatus.DELIVERY_CANCELLED;
if (!this.deliveryStatus.canTransitTo(nextDeliveryStatus)) {
throw new BusinessException(ErrorCode.DELIVERY_ALREADY_COMPLETED_OR_CANCELED);
}
this.deliveryStatus = nextDeliveryStatus;
}
}
package chill_logistics.delivery_server.domain.entity;
import ...
...(생략)
// length=15 옵션 제거
@Enumerated(EnumType.STRING)
@Column(name = "delivery_status", nullable = false)
private DeliveryStatus deliveryStatus;
...(생략)
... (기존 메서드 생략)
// 주문 취소 시 처리 메서드
public void cancelDueToOrder() {
// 멱등성: 이미 취소된 상태면 다시 호출되어도 OK || 이미 상품 받았으면 주문 취소 불가
if (this.deliveryStatus == DeliveryStatus.DELIVERY_CANCELLED
|| this.deliveryStatus == DeliveryStatus.DELIVERY_COMPLETED) {
return;
}
DeliveryStatus nextDeliveryStatus = DeliveryStatus.DELIVERY_CANCELLED;
if (!this.deliveryStatus.canTransitTo(nextDeliveryStatus)) {
throw new BusinessException(ErrorCode.DELIVERY_ALREADY_COMPLETED_OR_CANCELED);
}
this.deliveryStatus = nextDeliveryStatus;
}
}
package chill_logistics.delivery_server.application;
public enum OrderStatus {
CREATED,
PROCESSING,
IN_TRANSIT,
COMPLETED,
CANCELED
}
package chill_logistics.delivery_server.infrastructure.kafka.dto;
import chill_logistics.delivery_server.application.OrderStatus;
import java.time.LocalDateTime;
import java.util.UUID;
public record OrderStatusChangedV1(
UUID orderId,
OrderStatus orderStatus,
String reason,
LocalDateTime changedAt
) {}
package chill_logistics.delivery_server.infrastructure.config;
import chill_logistics.delivery_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderStatusChangedV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
/* 공통 Consumer 설정 */
private Map<String, Object> baseConsumerProps(String groupId) {
Map<String, Object> props = new HashMap<>();
// Kafka Broker
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Consumer Group ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Key 역직렬화
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Value 역직렬화
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 처음 실행 시 offset 정책 (earliest: consumer group이 이 토픽의 이 파티션을 지금 들어오는 새 메시지부터 읽는다)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
/* HubRouteAfterCreate */
@Bean
public ConsumerFactory<String, HubRouteAfterCreateV1> hubConsumerFactory() {
// JSON → HubRouteAfterCreateV1 역직렬화를 위한 Deserializer
JsonDeserializer<HubRouteAfterCreateV1> deserializer =
new JsonDeserializer<>(HubRouteAfterCreateV1.class, false);
// Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto"
);
// 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
return new DefaultKafkaConsumerFactory<>(
baseConsumerProps("delivery-server-hub-route-group"),
new StringDeserializer(), // Key Deserializer (String)
deserializer // Value Deserializer (HubRouteAfterCreateV1)
);
}
/* HubRouteAfterCreate */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1>
// @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
hubKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// ConsumerFactory 설정
factory.setConsumerFactory(hubConsumerFactory());
return factory;
}
/* OrderStatusChanged */
@Bean
public ConsumerFactory<String, OrderStatusChangedV1> orderStatusChangedConsumerFactory() {
JsonDeserializer<OrderStatusChangedV1> deserializer =
new JsonDeserializer<>(OrderStatusChangedV1.class, false);
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto",
"chill_logistics.delivery_server.application" // OrderStatus ENUM 역직렬화용
);
return new DefaultKafkaConsumerFactory<>(
baseConsumerProps("delivery-server-order-status-group"),
new StringDeserializer(),
deserializer
);
}
/* OrderStatusChanged */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderStatusChangedV1>
orderStatusChangedKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderStatusChangedV1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderStatusChangedConsumerFactory());
return factory;
}
}
package chill_logistics.delivery_server.infrastructure.kafka;
import chill_logistics.delivery_server.application.OrderStatus;
import chill_logistics.delivery_server.application.service.OrderCancellationService;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderStatusChangedV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusChangedListener {
private final OrderCancellationService orderCancellationService;
@KafkaListener(
topics = "order-status-changed",
groupId = "delivery-server-order-status-group",
containerFactory = "orderStatusChangedKafkaListenerContainerFactory"
)
public void listen(OrderStatusChangedV1 message) {
log.info("Kafka 메시지 수신: {}", message);
if (message.orderStatus() == OrderStatus.CANCELED) {
orderCancellationService.cancelDeliveriesByOrder(
message.orderId()
);
}
}
}
package chill_logistics.delivery_server.application.service;
import chill_logistics.delivery_server.domain.entity.FirmDelivery;
import chill_logistics.delivery_server.domain.entity.HubDelivery;
import chill_logistics.delivery_server.domain.repository.FirmDeliveryRepository;
import chill_logistics.delivery_server.domain.repository.HubDeliveryRepository;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderCancellationService {
private final HubDeliveryRepository hubDeliveryRepository;
private final FirmDeliveryRepository firmDeliveryRepository;
/* [주문 취소/실패 시 배송 취소 메서드]
* orderId 기준 HubDelivery / FirmDelivery 취소 처리
*/
@Transactional
public void cancelDeliveriesByOrder(UUID orderId) {
List<HubDelivery> hubDeliveryList = hubDeliveryRepository.findByOrderId(orderId);
List<FirmDelivery> firmDeliveryList = firmDeliveryRepository.findByOrderId(orderId);
if (hubDeliveryList.isEmpty() && firmDeliveryList.isEmpty()) {
log.info("[주문 취소에 따른 배송 취소] 대상 배송이 없습니다. orderId={}", orderId);
return;
}
for (HubDelivery hubDelivery : hubDeliveryList) {
hubDelivery.cancelDueToOrder();
}
for (FirmDelivery firmDelivery : firmDeliveryList) {
firmDelivery.cancelDueToOrder();
}
log.info("[주문 취소에 따른 배송 취소] 처리 완료. orderId={}", orderId);
}
}