1. 금융 서비스와 같이 정확성이 중요한 시스템은 중복처리가 되면 안된다.(ex. 입출금)
2. 해결방법 1. Kafka 키를 이용한 중복 제거 키로서의 Upsert 적용
3. 해결방법 2. 유니크 키 제약 조건을 활용한 중복 소비 문제 해결
4. 그외 방법 : 캐시를 이용한 중복 제거 / Bloom filter 활용한 중복제거
5. 실무 경험 : 별도 인프라 x, 네트워크 부하 x 로 사용할 수 있는, DB 기능을 활용 (upsert, unique key 제약)
Kafka를 사용하여 데이터를 소비할 때 중복된 메시지를 처리하는 것은 매우 중요한 문제입니다. 특히 금융 거래와 같이 정확성이 중요한 시스템에서는 메시지의 중복 처리가 큰 문제를 초래합니다. 이번 블로그에서는 Kafka Consumer에서 중복 제거를 위한 전략, 특히 Exactly-Once 처리에 대해 다루겠습니다.
Exactly-Once 처리란?
Exactly-Once 처리는 메시지가 소비자에게 단 한 번만 전달되고 처리되는 것을 보장하는 방식입니다. Kafka는 기본적으로 At-Least-Once(최소 한 번) 전달을 보장하며, 이 경우 중복 메시지가 발생할 수 있습니다. 이를 방지하기 위해 Exactly-Once 처리를 구현하면 중복된 메시지를 제거할 수 있습니다.
Upsert(업서트)란 존재하면 업데이트하고 존재하지 않으면 삽입하는 데이터베이스 작업입니다.
Kafka 메시지의 키를 중복 제거 키로 활용하면, 데이터베이스에 데이터를 삽입할 때 중복된 메시지를 효과적으로 처리할 수 있습니다.
Kafka 메시지의 키 설정:
Kafka Consumer에서 메시지 처리:
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
@Transactional
@Query(value = "INSERT INTO orders (order_id, product_id, quantity) VALUES (:orderId, :productId, :quantity) " +
"ON CONFLICT (order_id) DO UPDATE SET product_id = :product_id, quantity = :quantity", nativeQuery = true)
void upsertOrder(@Param("orderId") String orderId,
@Param("productId") String productId,
@Param("quantity") int quantity);
}
데이터베이스에 유니크 키 제약 조건을 설정하여 중복 메시지를 방지할 수 있습니다. 만약 중복된 키로 메시지를 삽입하려고 하면 데이터베이스에서 예외가 발생합니다. 이를 적절히 처리하면 중복 메시지 문제를 해결할 수 있습니다.
유니크 키 제약 조건 설정:
— 데이터베이스 테이블에 고유 키 제약 조건을 설정합니다. 예를 들어, orders
테이블의 order_id
칼럼에 유니크 제약 조건을 추가합니다.
ALTER TABLE orders ADD CONSTRAINT unique_order_id UNIQUE (order_id);
import org.springframework.dao.DataIntegrityViolationException;
public void consumeMessage(Order order) {
try {
// 데이터베이스에 Upsert 작업 수행
orderRepository.save(order);
} catch (DataIntegrityViolationException e) {
// +SUDO : 대충 데이터 중복 저장으로 인한 예외만 catch하는 로직
System.out.println("중복된 메시지 처리: " + order.getOrderId());
// 필요에 따라 로깅 또는 다른 처리 작업 수행
}
}
Kafka Consumer 애플리케이션 내에 중복 메시지를 감지하기 위한 캐시를 사용할 수 있습니다. 메시지의 키를 캐시에 저장하고, 이미 처리된 키는 무시하는 방식입니다.
Stateless한 서버 구현을 위해 Redis를 이용할 수도 있습니다.
캐시 설정:
캐시를 이용한 중복 제거:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MessageProcessor {
private static final int CACHE_SIZE = 10000;
private final ConcurrentMap<String, Boolean> messageCache = new ConcurrentHashMap<>();
public void consumeMessage(Order order) {
if (messageCache.putIfAbsent(order.getOrderId(), Boolean.TRUE) == null) {
// 메시지 처리 로직
processOrder(order);
} else {
// 중복 메시지 처리 로직
System.out.println("중복된 메시지 처리: " + order.getOrderId());
}
}
private void processOrder(Order order) {
// 주문 처리 로직
orderRepository.save(order);
}
}
Bloom filter는 키 값을 해시해서 저장하는 방식으로 캐싱을 하는 방식입니다. 이 방식은 메모리 효율적인 방식이며, 대규모 트래픽에도 효율적입니다.
False Positive: Bloom Filter는 거짓 양성(False Positive)을 허용합니다. 즉, 실제로는 존재하지 않는 키를 존재한다고 잘못 판단할 수 있습니다.
별도의 리소스 필요: 추가 인프라가 필요합니다.
상황에 따라 다르지만, 개인적으로는 캐시 또는 Bloom Filter를 사용하는 방법은 별도의 리소스를 필요로 하여 관리 포인트가 증가합니다.
또한 네트워크 통신을 요구하므로 Upsert
나 Unique Key
방식이 효율적이라고 생각합니다.
이러한 방식들은 데이터베이스 자체의 기능을 활용하기 때문에 하기 장점이 있습니다.
- 별도의 리소스 관리가 필요 없고,
- 네트워크 오버헤드를 줄일 수 있습니다.
- 구현이 간단
따라서 실무에서도 상황에 맞게 해결 방법 1, 2를 사용했습니다.
Kafka Consumer에서 중복 메시지를 제거하기 위해 다양한 전략을 사용할 수 있습니다. Kafka 메시지의 키를 중복 제거 키로 활용하여 Upsert 작업을 수행하고, 데이터베이스의 유니크 키 제약 조건을 활용하며, 캐시를 사용하는 방법 등을 통해 중복 메시지 문제를 효과적으로 해결할 수 있습니다. 각 방법의 장단점을 고려하여 시스템의 요구사항에 맞는 최적의 방법을 선택하는 것이 중요합니다.
이 블로그 포스트가 Kafka를 사용한 중복 제거 전략을 이해하고 구현하는 데 도움이 되기를 바랍니다.