Kafka 이벤트 파이프라인 구축기: 현실적인 선택과 타협

묘니·2025년 9월 5일

신규 도메인 추가

1. EventLog (감사 로그)

@Entity
public class EventLog {
    @Id
    private Long id;
    private String eventId;      // UUID
    private String eventType;    // 클래스명
    private String aggregateId;  // 도메인 ID
    private String payload;      // JSON
    private LocalDateTime occurredOn;
    
    // Kafka 메타데이터
    private String topic;
    private Integer partition;
    private Long offset;
}

2. EventHandled (멱등성)

@Entity
@Table(uniqueConstraints = @UniqueConstraint(
    columnNames = {"event_id", "consumer_group"}
))
public class EventHandled {
    private String eventId;
    private String consumerGroup;
    private LocalDateTime handledAt;
}

3. ProductMetrics (집계)

@Entity
public class ProductMetrics {
    @EmbeddedId
    private ProductMetricsId id;  // (productId, metricDate)
    
    private Integer likeCount;
    private Integer likeChange;   // 일별 변화량
    private Integer viewCount;
    private Integer salesCount;
    private BigDecimal salesAmount;
}

UPSERT 전략:

INSERT INTO product_metrics (product_id, metric_date, like_count)
VALUES (?, ?, 1)
ON DUPLICATE KEY UPDATE 
    like_count = like_count + 1,
    like_change = like_change + 1;

실제 구현 코드

EventStreamListener: Spring → Kafka 브릿지

@Component
@RequiredArgsConstructor
public class EventStreamListener {
    
    private final MessageBroker messageBroker;
    private final List<EventRoutingStrategy> routingStrategies;
    
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onDomainEvent(DomainEvent event) {
        EventRoutingStrategy strategy = findRoutingStrategy(event);
        
        if (strategy == null) {
            log.warn("No routing strategy for: {}", event.getClass());
            return;
        }
        
        MessageRoute route = strategy.route(event);
        
        try {
            messageBroker.publish(route.getStream(), route.getKey(), event);
            log.debug("Published: stream={}, key={}", route.getStream(), route.getKey());
        } catch (Exception e) {
            // 실패 처리 로직
            handleFailure(event, e);
        }
    }
}

Consumer

AuditLogConsumer

@Component
public class AuditLogConsumer {
    @KafkaListener(
        topics = {"product-view-events", "product-state-events", "like-events", "order-events", "payment-events"},
        groupId = "audit-log"
    )
    @Transactional
    public void handleEvent(ConsumerRecord<String, DomainEvent> record, Acknowledgment ack) {
        // 멱등성 체크
        if (idempotencyService.isProcessed(event.getEventId(), CONSUMER_GROUP)) {
            ack.acknowledge();
            return;
        }
        
        // 감사 로그 저장
        eventLogService.saveEventLog(event, record.topic(), record.partition(), record.offset());
        
        // 처리 완료
        idempotencyService.markAsProcessed(event.getEventId(), CONSUMER_GROUP);
        ack.acknowledge();
    }
}

MetricsConsumer

@Component
public class MetricsConsumer {
    @KafkaListener(
        topics = {"product-view-events", "like-events", "order-events"},
        groupId = "metrics-aggregator"
    )
    @Transactional
    public void handleEvent(ConsumerRecord<String, DomainEvent> record, Acknowledgment ack) {
        // 타입별 분기
        if (event instanceof LikeAddedEvent) {
            productMetricsService.incrementLikeCount(productId, date, 1);
        } else if (event instanceof OrderCompletedEvent) {
            for (OrderItemInfo item : event.getOrderItems()) {
                productMetricsService.incrementSalesCount(
                    item.getProductId(), date, item.getQuantity(), item.getPrice()
                );
            }
        }
        // ...
    }
}

CacheInvalidatorConsumer

현재 상품 총 카운트에 캐시가 존재하므로, 캐시 만료를 적용하기 어려웠다.
status를 추가하여 관리할 경우에 캐시 만료 전략을 활용할 수 있을 것 같다.

0개의 댓글