마이크로서비스 아키텍처에서 Kafka를 사용하다 보면 한 번쯤은 겪게 되는 메시지 누락 문제..
나도 역시 같은 문제를 겪었다.
이에 여러 방법을 시도하다가 결국 코드를 하나씩 뜯어보았고
그러면서 '오프셋 커밋 시점'으로 인한 데이터 누락임을 깨달았다.

= Kafka에서 "어디까지 메시지를 읽었는지"를 표시하는 번호다.
그리고 오프셋 커밋은 Consumer가 "여기까지 메시지를 처리했다"라고 Kafka에 알리는 것
비즈니스 보안 상 내가 어떤 서비스를 개발하다가 이런 이슈를 마주했는지 적을 수는 없지만 예시를 들어서 설명해보겠다.
가령, 온라인 쇼핑몰 주문 처리 시스템이라고 하면
다음과 같은 마이크로서비스 구조가 나온다.
((주문 서비스 → 🔄 Kafka → 재고 서비스))
주문이 생성되면 재고를 차감하는 시스템이라고 보면 된다.
이 사이 데이터를 주고 받는데 누락이 되는 상황이었다.
그런데 확인을 해보니 카프카는 애플리케이션 계층으로 데이터를 전달하면 자동으로 해당 시점에 커밋을 한다는 사실을 알게 되었다.
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderEvent(@Payload String orderMessage) {
// Kafka가 오프셋을 커밋하는 시점
원하는 시점에 커밋을 해주도록 명시하는 것이다.
먼저 config 파일에 자동이 아니라 수동 커밋을 하도록 활성화를 해주자.
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//수동 커밋 활성화
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
@KafkaListener(
topics = "order-events",
groupId = "inventory-service",
containerFactory = "manualCommitContainerFactory"
)
public void handleOrderEvent(@Payload String orderMessage, Acknowledgment ack) {
try {
OrderEvent order = parseOrder(orderMessage);
inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
//모든 처리가 성공했을 때만 커밋
ack.acknowledge();
log.info("재고 차감 및 커밋 완료: {}", order.getProductId());
} catch (Exception e) {
//실패 시 메시지 재처리
log.error("재고 차감 실패, 메시지 재처리 예정: {}", e.getMessage());
}
}
yml파일에도 설정을 넣어주자.
spring:
kafka:
consumer:
enable-auto-commit: false # 자동 커밋 비활성화
@Service
public class InventoryService {
@Transactional
public void decreaseStock(String orderId, String productId, int quantity) {
if (processedOrderRepository.existsByOrderId(orderId)) {
log.info("이미 처리된 주문: {}", orderId);
return;
}
Product product = productRepository.findById(productId);
product.decreaseStock(quantity);
processedOrderRepository.save(new ProcessedOrder(orderId));
}
}
public void handleOrderEvent(@Payload String orderMessage, Acknowledgment ack) {
try {
inventoryService.decreaseStock(order);
ack.acknowledge();
} catch (BusinessException e) {
// 비즈니스 로직 오류는 DLQ로 전송 후 커밋
log.error("비즈니스 오류, DLQ로 전송: {}", e.getMessage());
deadLetterQueueService.send(orderMessage);
ack.acknowledge();
} catch (Exception e) {
// 일시적 오류 → 재시도
log.error("일시적 오류, 재시도 예정: {}", e.getMessage());
}
}
Kafka의 오프셋 커밋은 메시지 처리의 신뢰성을 결정하는 중요한 부분인데, 본의 아니게 이슈를 마주치게 되어서 그 덕에 어떤 구조에서 통신이 이루어지며, 데이터 누락이 발생하는지 트래킹하는 과정을 겪을 수 있어 꽤나 의미있는 시간이었다.
도저히 파악하기 어려웠던 이유를 찾고 문제를 해결한 덕에 시야가 좀 더 넓어진 느낌이다.
참고자료
https://kafka.apache.org/documentation/#consumerconfigs
https://docs.spring.io/spring-kafka/docs/current/reference/html/