Kafka Consumer에서 중복 메시지 처리: Exactly-Once 전략

김기현·2024년 6월 1일
2

TL;DR

1. 금융 서비스와 같이 정확성이 중요한 시스템은 중복처리가 되면 안된다.(ex. 입출금)
2. 해결방법 1. Kafka 키를 이용한 중복 제거 키로서의 Upsert 적용
3. 해결방법 2. 유니크 키 제약 조건을 활용한 중복 소비 문제 해결
4. 그외 방법 : 캐시를 이용한 중복 제거 / Bloom filter 활용한 중복제거
5. 실무 경험 : 별도 인프라 x, 네트워크 부하 x 로 사용할 수 있는, DB 기능을 활용 (upsert, unique key 제약)

서론

Kafka를 사용하여 데이터를 소비할 때 중복된 메시지를 처리하는 것은 매우 중요한 문제입니다. 특히 금융 거래와 같이 정확성이 중요한 시스템에서는 메시지의 중복 처리가 큰 문제를 초래합니다. 이번 블로그에서는 Kafka Consumer에서 중복 제거를 위한 전략, 특히 Exactly-Once 처리에 대해 다루겠습니다.

Exactly-Once 처리란?
Exactly-Once 처리는 메시지가 소비자에게 단 한 번만 전달되고 처리되는 것을 보장하는 방식입니다. Kafka는 기본적으로 At-Least-Once(최소 한 번) 전달을 보장하며, 이 경우 중복 메시지가 발생할 수 있습니다. 이를 방지하기 위해 Exactly-Once 처리를 구현하면 중복된 메시지를 제거할 수 있습니다.

해결방법 1: Kafka 키를 이용한 중복 제거 키로서의 Upsert 적용

Upsert(업서트)란 존재하면 업데이트하고 존재하지 않으면 삽입하는 데이터베이스 작업입니다.
Kafka 메시지의 키를 중복 제거 키로 활용하면, 데이터베이스에 데이터를 삽입할 때 중복된 메시지를 효과적으로 처리할 수 있습니다.

구현 방법

Kafka 메시지의 키 설정:

  • Kafka Producer에서 메시지를 전송할 때 고유한 키를 설정합니다.
    ex), 주문 시스템의 경우 주문 ID를 키로 설정할 수 있습니다.

Kafka Consumer에서 메시지 처리:

  • Consumer는 메시지를 수신하고, 메시지의 키를 기반으로 데이터베이스에 Upsert 작업을 수행합니다.
  • 데이터베이스에 해당 키를 가진 레코드가 이미 존재하면 업데이트하고, 존재하지 않으면 새로운 레코드를 삽입합니다.
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
@Transactional
 @Query(value = "INSERT INTO orders (order_id, product_id, quantity) VALUES (:orderId, :productId, :quantity) " +
 "ON CONFLICT (order_id) DO UPDATE SET product_id = :product_id, quantity = :quantity", nativeQuery = true)
 void upsertOrder(@Param("orderId") String orderId, 
 @Param("productId") String productId, 
 @Param("quantity") int quantity);
}

장점

  • 간단함: 구현이 비교적 간단하고, 데이터베이스에서 기본 제공하는 기능을 사용할 수 있습니다.
  • 효율성: 데이터베이스 수준에서 중복 제거가 이루어지므로 효율적입니다.

단점

  • 데이터베이스 의존성: 데이터베이스 성능에 크게 의존하며, 데이터베이스가 커질수록 성능 저하가 발생할 수 있습니다.
  • 트랜잭션 처리: 트랜잭션 처리와 관련된 오버헤드가 발생할 수 있습니다.

해결방법 2: 유니크 키 제약 조건을 활용한 중복 소비 문제 해결

데이터베이스에 유니크 키 제약 조건을 설정하여 중복 메시지를 방지할 수 있습니다. 만약 중복된 키로 메시지를 삽입하려고 하면 데이터베이스에서 예외가 발생합니다. 이를 적절히 처리하면 중복 메시지 문제를 해결할 수 있습니다.

구현 방법

유니크 키 제약 조건 설정:
— 데이터베이스 테이블에 고유 키 제약 조건을 설정합니다. 예를 들어, orders 테이블의 order_id 칼럼에 유니크 제약 조건을 추가합니다.

ALTER TABLE orders ADD CONSTRAINT unique_order_id UNIQUE (order_id);
  1. 중복 소비 처리 로직:
  • Kafka Consumer에서 메시지를 처리할 때, 데이터베이스에 삽입 시 중복 키 예외(DataIntegrityViolationException)가 발생하면 이를 catch하여 중복 메시지로 간주하고 적절히 처리합니다.
import org.springframework.dao.DataIntegrityViolationException;
public void consumeMessage(Order order) {
 try {
 // 데이터베이스에 Upsert 작업 수행
 orderRepository.save(order);
 } catch (DataIntegrityViolationException e) {
 // +SUDO : 대충 데이터 중복 저장으로 인한 예외만 catch하는 로직
 System.out.println("중복된 메시지 처리: " + order.getOrderId());
 // 필요에 따라 로깅 또는 다른 처리 작업 수행
 }
}

장점

  • 데이터 무결성: 유니크 제약 조건을 통해 데이터 무결성을 강제로 유지할 수 있습니다.
  • 자동 처리: 데이터베이스에서 자동으로 중복 메시지를 감지하여 처리할 수 있습니다.

단점

  • 오버헤드: 유니크 제약 조건을 설정하면 삽입/업데이트 작업 시 오버헤드가 발생할 수 있습니다.
  • 코드 복잡도 증가: 예외 처리를 위한 추가 로직이 필요합니다.

추가적으로 고려할 수 있는 해결 방법

Cache

Kafka Consumer 애플리케이션 내에 중복 메시지를 감지하기 위한 캐시를 사용할 수 있습니다. 메시지의 키를 캐시에 저장하고, 이미 처리된 키는 무시하는 방식입니다.
Stateless한 서버 구현을 위해 Redis를 이용할 수도 있습니다.

구현 방법

캐시 설정:

  • 메시지 키를 저장할 캐시를 설정합니다.
    예를 들어, LRU(Least Recently Used) 캐시를 사용할 수 있습니다.

캐시를 이용한 중복 제거:

  • 메시지 수신 시 캐시에서 키를 확인하고, 키가 존재하면 중복 메시지로 간주하고 무시합니다.
    키가 존재하지 않으면 처리 후 캐시에 저장합니다.
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MessageProcessor {
 private static final int CACHE_SIZE = 10000;
 private final ConcurrentMap<String, Boolean> messageCache = new ConcurrentHashMap<>();
public void consumeMessage(Order order) {
 if (messageCache.putIfAbsent(order.getOrderId(), Boolean.TRUE) == null) {
 // 메시지 처리 로직
   processOrder(order);
 } else {
     // 중복 메시지 처리 로직
     System.out.println("중복된 메시지 처리: " + order.getOrderId());
   }
 }
private void processOrder(Order order) {
 // 주문 처리 로직
 orderRepository.save(order);
 }
}

장점

  • 속도: 캐시를 사용하여 중복을 감지하므로 빠릅니다.
  • 애플리케이션 레벨 처리: 데이터베이스에 접근하기 전에 중복 메시지를 걸러낼 수 있습니다.

단점

  • 메모리 사용량: 캐시 크기에 따라 메모리 사용량이 증가할 수 있습니다.
  • 데이터 손실 가능성: 캐시가 만료되면 중복 메시지를 다시 처리할 가능성이 있습니다.

Bloom Filter 이용하기 w/ Redis

Bloom filter는 키 값을 해시해서 저장하는 방식으로 캐싱을 하는 방식입니다. 이 방식은 메모리 효율적인 방식이며, 대규모 트래픽에도 효율적입니다.

장점

  • 메모리 효율성: 적은 메모리로 대량의 키를 관리할 수 있습니다.
  • 속도: 매우 빠르게 중복을 감지할 수 있습니다.

단점

False Positive: Bloom Filter는 거짓 양성(False Positive)을 허용합니다. 즉, 실제로는 존재하지 않는 키를 존재한다고 잘못 판단할 수 있습니다.
별도의 리소스 필요: 추가 인프라가 필요합니다.

개인적인 견해 — 실무에서 사용한 방법

상황에 따라 다르지만, 개인적으로는 캐시 또는 Bloom Filter를 사용하는 방법은 별도의 리소스를 필요로 하여 관리 포인트가 증가합니다.

또한 네트워크 통신을 요구하므로 UpsertUnique Key 방식이 효율적이라고 생각합니다.

이러한 방식들은 데이터베이스 자체의 기능을 활용하기 때문에 하기 장점이 있습니다.

  • 별도의 리소스 관리가 필요 없고,
  • 네트워크 오버헤드를 줄일 수 있습니다.
  • 구현이 간단

따라서 실무에서도 상황에 맞게 해결 방법 1, 2를 사용했습니다.

결론

Kafka Consumer에서 중복 메시지를 제거하기 위해 다양한 전략을 사용할 수 있습니다. Kafka 메시지의 키를 중복 제거 키로 활용하여 Upsert 작업을 수행하고, 데이터베이스의 유니크 키 제약 조건을 활용하며, 캐시를 사용하는 방법 등을 통해 중복 메시지 문제를 효과적으로 해결할 수 있습니다. 각 방법의 장단점을 고려하여 시스템의 요구사항에 맞는 최적의 방법을 선택하는 것이 중요합니다.

이 블로그 포스트가 Kafka를 사용한 중복 제거 전략을 이해하고 구현하는 데 도움이 되기를 바랍니다.

0개의 댓글