🔹 Outbox 패턴 적용 리팩터링
🔸 OrderOutboxEvent
package chill_logistics.order_server.domain.entity;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EntityListeners;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.UUID;
import lib.entity.BaseEntity;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
@Getter
@Entity
@Table(name = "p_order_outbox_event")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EntityListeners(AuditingEntityListener.class)
public class OrderOutboxEvent extends BaseEntity {
@Id
@GeneratedValue(generator = "uuidv7")
@GenericGenerator(
name = "uuidv7",
strategy = "lib.id.UUIDv7Generator"
)
@Column(name = "id", columnDefinition = "BINARY(16)")
private UUID id;
@Column(name = "order_id", nullable = false, columnDefinition = "BINARY(16)")
private UUID orderId;
@Column(name = "event_type", nullable = false, length = 100)
private String eventType;
@Column(name = "payload", nullable = false, columnDefinition = "TEXT")
private String payload;
@Enumerated(EnumType.STRING)
@Column(name = "order_outbox_status", nullable = false, length = 30)
private OrderOutboxStatus orderOutboxStatus;
@Column(name = "retry_count", nullable = false)
private int retryCount;
@Column(name = "published_at")
private LocalDateTime publishedAt;
@Column(name = "last_retry_at")
private LocalDateTime lastRetryAt;
private static final int MAX_RETRY_COUNT = 3;
private static final long MAX_BACKOFF_MILLIS = 100L;
private OrderOutboxEvent(UUID orderId, String eventType, String payload) {
this.orderId = orderId;
this.eventType = eventType;
this.payload = payload;
this.orderOutboxStatus = OrderOutboxStatus.PENDING;
this.retryCount = 0;
}
public static OrderOutboxEvent create(UUID orderId, String eventType, String payload) {
return new OrderOutboxEvent(orderId, eventType, payload);
}
public boolean alreadyHandled() {
return this.orderOutboxStatus == OrderOutboxStatus.PUBLISHED
|| this.orderOutboxStatus == OrderOutboxStatus.FAILED;
}
public boolean canRetry() {
return this.orderOutboxStatus == OrderOutboxStatus.PENDING
&& this.retryCount < MAX_RETRY_COUNT;
}
public boolean readyToRetry() {
if (!canRetry()) {
return false;
}
if (this.lastRetryAt == null) {
return true;
}
long timeSinceLastRetryMillis =
Duration.between(this.lastRetryAt, LocalDateTime.now()).toMillis();
return timeSinceLastRetryMillis >= MAX_BACKOFF_MILLIS;
}
public void markPublished() {
this.orderOutboxStatus = OrderOutboxStatus.PUBLISHED;
this.publishedAt = LocalDateTime.now();
}
public void markFailed() {
this.retryCount++;
this.lastRetryAt = LocalDateTime.now();
if (this.retryCount >= MAX_RETRY_COUNT) {
this.orderOutboxStatus = OrderOutboxStatus.FAILED;
}
}
}
🔸 KafkaProducerConfig
package chill_logistics.order_server.infrastructure.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> outboxProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> outboxKafkaTemplate() {
return new KafkaTemplate<>(outboxProducerFactory());
}
}
🔸 OrderOutboxKafkaProducer
package chill_logistics.order_server.infrastructure.kafka.outbox;
import chill_logistics.order_server.domain.event.OutboxEventProducer;
import chill_logistics.order_server.infrastructure.kafka.KafkaPassportProducerSupport;
import chill_logistics.order_server.lib.error.ErrorCode;
import java.util.concurrent.CompletableFuture;
import lib.passport.PassportIssuer;
import lib.web.error.BusinessException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderOutboxKafkaProducer implements OutboxEventProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final PassportIssuer passportIssuer;
@Value("${app.kafka.topic.order-after-create}")
private String orderAfterCreateTopic;
@Value("${app.kafka.topic.order-canceled}")
private String orderCanceledTopic;
@Override
public CompletableFuture<SendResult<String, String>> publish(String eventType, String key, String payload) {
String topic = resolveTopic(eventType);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, payload);
KafkaPassportProducerSupport.writeHeaders(record.headers(), passportIssuer);
return kafkaTemplate.send(record);
}
private String resolveTopic(String eventType) {
return switch (eventType) {
case "OrderAfterCreateV1" -> orderAfterCreateTopic;
case "OrderCanceledV1" -> orderCanceledTopic;
default -> throw new BusinessException(ErrorCode.UNKNOWN_EVENT_TYPE);
};
}
}
🔸 OrderOutboxProcessor
package chill_logistics.order_server.infrastructure.kafka.outbox;
import chill_logistics.order_server.domain.entity.OrderOutboxEvent;
import chill_logistics.order_server.domain.entity.OrderOutboxStatus;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderOutboxProcessor {
private static final int BATCH_SIZE = 200;
private static final long FIXED_DELAY_MS = 500;
private final OrderOutboxEventRepository outboxEventRepository;
private final OutboxEventTransactionManager transactionManager;
@Scheduled(fixedDelay = FIXED_DELAY_MS)
public void publishPendingEvents() {
List<OrderOutboxEvent> eventList = outboxEventRepository.findPendingEvents(
OrderOutboxStatus.PENDING,
PageRequest.of(0, BATCH_SIZE)
);
if (eventList.isEmpty()) {
return;
}
int processed = 0;
int skipped = 0;
for (OrderOutboxEvent event : eventList) {
if (event.alreadyHandled() || !event.readyToRetry()) {
skipped++;
continue;
}
transactionManager.publishEvent(event.getId());
processed++;
}
log.info("[OUTBOX 처리 완료] 조회건수={} 처리건수={} 스킵건수={}",
eventList.size(), processed, skipped);
}
}
🔸 OutboxEventTransactionManager
package chill_logistics.order_server.infrastructure.kafka.outbox;
import chill_logistics.order_server.domain.entity.OrderOutboxEvent;
import chill_logistics.order_server.domain.event.OutboxEventProducer;
import chill_logistics.order_server.lib.error.ErrorCode;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lib.web.error.BusinessException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class OutboxEventTransactionManager {
private static final long SEND_CONFIRM_TIMEOUT_MS = 3000;
private final OrderOutboxEventRepository outboxEventRepository;
private final OutboxEventProducer outboxEventProducer;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishEvent(UUID outboxId) {
OrderOutboxEvent event = outboxEventRepository.findById(outboxId)
.orElseThrow(() -> new BusinessException(ErrorCode.ORDER_OUTBOX_EVENT_NOT_FOUND));
if (event.alreadyHandled() || !event.readyToRetry()) {
return;
}
String key = event.getOrderId().toString();
try {
outboxEventProducer
.publish(event.getEventType(), key, event.getPayload())
.get(SEND_CONFIRM_TIMEOUT_MS, TimeUnit.MILLISECONDS);
event.markPublished();
log.info("[OUTBOX 이벤트 발행 성공] outboxId={} eventType={} orderId={}",
event.getId(), event.getEventType(), event.getOrderId());
} catch (Exception ex) {
event.markFailed();
log.warn("[OUTBOX 이벤트 발행 실패] outboxId={} 재시도 횟수={}",
event.getId(), event.getRetryCount(), ex);
}
}
}
🔸 OrderCommandService
package chill_logistics.order_server.application.service;
import chill_logistics.order_server.application.dto.command.CreateOrderCommandV1;
import chill_logistics.order_server.application.dto.command.CreateOrderResultV1;
import chill_logistics.order_server.application.dto.command.FirmInfoV1;
import chill_logistics.order_server.application.dto.command.FirmResultV1;
import chill_logistics.order_server.application.dto.command.OrderAfterCreateV1;
import chill_logistics.order_server.application.dto.command.OrderCanceledV1;
import chill_logistics.order_server.application.dto.command.OrderProductInfoV1;
import chill_logistics.order_server.application.dto.command.ProductResultV1;
import chill_logistics.order_server.application.dto.command.ReceiverInfoV1;
import chill_logistics.order_server.application.dto.command.SupplierInfoV1;
import chill_logistics.order_server.application.dto.command.UpdateOrderStatusCommandV1;
import chill_logistics.order_server.domain.entity.Order;
import chill_logistics.order_server.domain.entity.OrderOutboxEvent;
import chill_logistics.order_server.domain.entity.OrderProduct;
import chill_logistics.order_server.domain.entity.OrderQuery;
import chill_logistics.order_server.domain.entity.OrderStatus;
import chill_logistics.order_server.domain.port.FirmPort;
import chill_logistics.order_server.domain.port.HubPort;
import chill_logistics.order_server.domain.port.ProductPort;
import chill_logistics.order_server.domain.repository.OrderQueryRepository;
import chill_logistics.order_server.domain.repository.OrderRepository;
import chill_logistics.order_server.lib.error.ErrorCode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import lib.entity.Role;
import lib.util.SecurityUtils;
import lib.web.error.BusinessException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private static final String EVENT_ORDER_AFTER_CREATE = "OrderAfterCreateV1";
private static final String EVENT_ORDER_CANCELED = "OrderCanceledV1";
private final OrderRepository orderRepository;
private final OrderQueryRepository orderQueryRepository;
private final ProductPort productPort;
private final HubPort hubPort;
private final FirmPort firmPort;
private final OrderOutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
private Order readOrderOrThrow(UUID orderId) {
return orderRepository.findById(orderId)
.orElseThrow(() -> new BusinessException(ErrorCode.ORDER_NOT_FOUND));
}
private void saveOutboxEvent(UUID orderId, String eventType, Object message) {
final String payload;
try {
payload = objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
log.error("[OUTBOX payload 직렬화 실패] orderId={} eventType={}", orderId, eventType, e);
throw new BusinessException(ErrorCode.OUTBOX_PAYLOAD_SERIALIZATION_FAILED);
}
OrderOutboxEvent outboxEvent = OrderOutboxEvent.create(orderId, eventType, payload);
outboxEventRepository.save(outboxEvent);
log.info("[OUTBOX 이벤트 적재 완료] orderId={} eventType={} outboxId={}",
orderId, eventType, outboxEvent.getId());
}
@Transactional
public CreateOrderResultV1 createOrder(CreateOrderCommandV1 command) {
FirmResultV1 supplierResult = firmPort.readFirmById(command.supplierFirmId(), "SUPPLIER");
FirmResultV1 receiverResult = firmPort.readFirmById(command.receiverFirmId(), "RECEIVER");
List<OrderProductInfoV1> orderProductInfoList =
command.productList()
.stream()
.map(p -> {
ProductResultV1 product = productPort.readProductById(p.productId());
if (!product.firmId().equals(command.supplierFirmId())) {
throw new BusinessException(ErrorCode.PRODUCT_NOT_FROM_FIRM);
}
if (product.stockQuantity() < p.quantity()) {
throw new BusinessException(ErrorCode.OUT_OF_STOCK);
}
productPort.decreaseStock(p.productId(), p.quantity());
return new OrderProductInfoV1(
p.productId(),
product.name(),
product.price(),
p.quantity()
);
})
.toList();
Order order = Order.create(
command.supplierFirmId(),
command.receiverFirmId(),
command.requestNote(),
orderProductInfoList
);
Order createOrder = orderRepository.save(order);
OrderQuery orderQuery = OrderQuery.create(
createOrder,
SupplierInfoV1.from(supplierResult),
ReceiverInfoV1.from(receiverResult)
);
orderQueryRepository.save(orderQuery);
OrderAfterCreateV1 message = new OrderAfterCreateV1(
createOrder.getId(),
supplierResult.hubId(),
receiverResult.hubId(),
createOrder.getReceiverFirmId(),
receiverResult.firmFullAddress(),
receiverResult.firmOwnerName(),
createOrder.getRequestNote(),
createOrder.getOrderProductList().get(0).getProductName(),
createOrder.getOrderProductList().get(0).getQuantity(),
createOrder.getCreatedAt()
);
saveOutboxEvent(createOrder.getId(), EVENT_ORDER_AFTER_CREATE, message);
return CreateOrderResultV1.from(
createOrder,
FirmInfoV1.from(supplierResult),
FirmInfoV1.from(receiverResult)
);
}
@Transactional
public void updateOrderStatus(UUID id, UpdateOrderStatusCommandV1 command) {
Order order = readOrderOrThrow(id);
if (SecurityUtils.hasRole(Role.HUB_MANAGER)) {
List<UUID> managingHubId = hubPort.readHubId(SecurityUtils.getCurrentUserId());
UUID receiverHubId = firmPort.readHubId(order.getReceiverFirmId());
if (!managingHubId.contains(receiverHubId)) {
throw new BusinessException(ErrorCode.ORDER_NOT_IN_MANAGING_HUB);
}
}
order.updateStatus(command.status());
}
@Transactional
public void deleteOrder(UUID id) {
Order order = readOrderOrThrow(id);
if (SecurityUtils.hasRole(Role.HUB_MANAGER)) {
List<UUID> managingHubId = hubPort.readHubId(SecurityUtils.getCurrentUserId());
UUID receiverHubId = firmPort.readHubId(order.getReceiverFirmId());
if (!managingHubId.contains(receiverHubId)) {
throw new BusinessException(ErrorCode.ORDER_NOT_IN_MANAGING_HUB);
}
}
if (SecurityUtils.hasRole(Role.FIRM_MANAGER)) {
UUID currentUserId = SecurityUtils.getCurrentUserId();
if (!currentUserId.equals(order.getCreatedBy())) {
throw new BusinessException(ErrorCode.ORDER_NOT_CREATED_BY_USER);
}
}
order.updateStatus(OrderStatus.CANCELED);
order.delete(SecurityUtils.getCurrentUserId());
for (OrderProduct p : order.getOrderProductList()) {
productPort.recoverStock(p.getProductId(), p.getQuantity());
}
OrderCanceledV1 message = new OrderCanceledV1(
order.getId(),
order.getOrderStatus(),
LocalDateTime.now()
);
saveOutboxEvent(order.getId(), EVENT_ORDER_CANCELED, message);
}
}