@Entity
public class EventLog {
@Id
private Long id;
private String eventId; // UUID
private String eventType; // 클래스명
private String aggregateId; // 도메인 ID
private String payload; // JSON
private LocalDateTime occurredOn;
// Kafka 메타데이터
private String topic;
private Integer partition;
private Long offset;
}
@Entity
@Table(uniqueConstraints = @UniqueConstraint(
columnNames = {"event_id", "consumer_group"}
))
public class EventHandled {
private String eventId;
private String consumerGroup;
private LocalDateTime handledAt;
}
@Entity
public class ProductMetrics {
@EmbeddedId
private ProductMetricsId id; // (productId, metricDate)
private Integer likeCount;
private Integer likeChange; // 일별 변화량
private Integer viewCount;
private Integer salesCount;
private BigDecimal salesAmount;
}
UPSERT 전략:
INSERT INTO product_metrics (product_id, metric_date, like_count)
VALUES (?, ?, 1)
ON DUPLICATE KEY UPDATE
like_count = like_count + 1,
like_change = like_change + 1;
@Component
@RequiredArgsConstructor
public class EventStreamListener {
private final MessageBroker messageBroker;
private final List<EventRoutingStrategy> routingStrategies;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onDomainEvent(DomainEvent event) {
EventRoutingStrategy strategy = findRoutingStrategy(event);
if (strategy == null) {
log.warn("No routing strategy for: {}", event.getClass());
return;
}
MessageRoute route = strategy.route(event);
try {
messageBroker.publish(route.getStream(), route.getKey(), event);
log.debug("Published: stream={}, key={}", route.getStream(), route.getKey());
} catch (Exception e) {
// 실패 처리 로직
handleFailure(event, e);
}
}
}
@Component
public class AuditLogConsumer {
@KafkaListener(
topics = {"product-view-events", "product-state-events", "like-events", "order-events", "payment-events"},
groupId = "audit-log"
)
@Transactional
public void handleEvent(ConsumerRecord<String, DomainEvent> record, Acknowledgment ack) {
// 멱등성 체크
if (idempotencyService.isProcessed(event.getEventId(), CONSUMER_GROUP)) {
ack.acknowledge();
return;
}
// 감사 로그 저장
eventLogService.saveEventLog(event, record.topic(), record.partition(), record.offset());
// 처리 완료
idempotencyService.markAsProcessed(event.getEventId(), CONSUMER_GROUP);
ack.acknowledge();
}
}
@Component
public class MetricsConsumer {
@KafkaListener(
topics = {"product-view-events", "like-events", "order-events"},
groupId = "metrics-aggregator"
)
@Transactional
public void handleEvent(ConsumerRecord<String, DomainEvent> record, Acknowledgment ack) {
// 타입별 분기
if (event instanceof LikeAddedEvent) {
productMetricsService.incrementLikeCount(productId, date, 1);
} else if (event instanceof OrderCompletedEvent) {
for (OrderItemInfo item : event.getOrderItems()) {
productMetricsService.incrementSalesCount(
item.getProductId(), date, item.getQuantity(), item.getPrice()
);
}
}
// ...
}
}
현재 상품 총 카운트에 캐시가 존재하므로, 캐시 만료를 적용하기 어려웠다.
status를 추가하여 관리할 경우에 캐시 만료 전략을 활용할 수 있을 것 같다.