Kafka producer Callback : 이벤트 발행을 비동기적으로 하면 이벤트 발행 실패를 어떻게 인지할까?

김기현·2024년 6월 2일
6

서론

Kafka를 처음 사용할 때, 이벤트 유실을 방지하기 위해 Persistence Layer의 도움을 받아야 할지, 아니면 동기적으로 호출해야 할지에 대해 많은 고민을 했습니다. 이벤트 유실은 치명적일 수 있기 때문에 안정성을 우선시하고 싶었지만, 성능적인 이슈도 무시할 수 없었습니다. 특히, 실시간 데이터 처리가 중요한 시스템에서는 높은 처리량과 낮은 지연 시간이 필수적입니다. 이러한 고민 끝에, 성능을 위해 비동기 방식을 포기할 수 없다는 결론에 도달했습니다.

Kafka 비동기 이벤트 발행의 장점

그렇다면 Kafka에서 이벤트를 비동기적으로 발행하면 장점이 무엇일까요?

  1. 높은 처리량: 비동기 발행을 통해 동시 다발적으로 많은 이벤트를 발행할 수 있습니다.
  2. 낮은 지연 시간: 비동기 방식은 발행 요청을 기다리지 않기 때문에 지연 시간이 줄어듭니다.
  3. 자원 효율성: 비동기 발행은 블로킹이 없기 때문에 시스템 자원을 효율적으로 사용할 수 있습니다.

하지만 비동기 발행을 할 때 이벤트 발행 실패를 감지하고 적절히 대응하는 것이 매우 중요합니다. 그렇지 않으면 데이터 손실이 발생할 수 있습니다.

Callback을 활용한 이벤트 발행 실패 감지

Kafka의 비동기 발행 방식에서는 Callback 인터페이스를 활용하여 이벤트 발행의 성공 또는 실패 여부를 확인할 수 있습니다. Callback은 이벤트 발행 요청이 완료된 후 호출되는 메서드를 정의합니다.

Callback 인터페이스

public interface Callback {
    void onCompletion(RecordMetadata metadata, Exception exception);
}

onCompletion 메서드는 두 개의 매개변수를 받습니다:

  • RecordMetadata metadata: 발행된 이벤트에 대한 메타데이터 정보를 담고 있습니다.
  • Exception exception: 이벤트 발행 중 발생한 예외를 담고 있습니다. 성공적으로 발행된 경우에는 null이 됩니다.

비동기 발행 코드 예제

아래는 Kafka 프로듀서를 사용하여 이벤트를 비동기적으로 발행하고 Callback을 활용하여 발행 실패를 감지하는 예제 코드입니다:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class AsyncKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "my-topic";
        String key = "my-key";
        String value = "my-value";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    // 발행 성공
                    System.out.println("Message sent successfully to topic " + metadata.topic() + " partition " + metadata.partition() + " at offset " + metadata.offset());
                } else {
                    // 발행 실패
                    System.err.println("Error sending message: " + exception.getMessage());
                    exception.printStackTrace();
                    // 재시도 로직 또는 대체 처리 로직 작성
                }
            }
        });

        producer.close();
    }
}

예외 처리 및 재시도 로직

위 예제에서 onCompletion 메서드 내의 exceptionnull인 경우 발행이 성공한 것입니다. 예외가 발생한 경우에는 exception 객체를 통해 상세한 에러 메시지를 확인할 수 있습니다. 이를 기반으로 다음과 같은 추가 처리를 할 수 있습니다:

  1. 로그 작성: 발행 실패에 대한 로그를 남겨서 문제를 추적할 수 있도록 합니다.
  2. 재시도 로직: 특정 조건에 따라 이벤트 발행을 재시도할 수 있습니다.
  3. 대체 처리: 발행 실패 시 다른 저장소에 데이터를 저장하거나 알림을 발송하는 등 대체 처리를 수행합니다.

결론

Kafka에서 비동기 이벤트 발행을 할 때 Callback을 활용하면 이벤트 발행 실패를 효과적으로 감지하고 처리할 수 있습니다. 이를 통해 시스템의 안정성과 신뢰성을 높일 수 있습니다. 위 예제를 참고하여 비동기 이벤트 발행 시 적절한 예외 처리와 재시도 로직을 구현해 보세요.


Kafka에서 비동기 이벤트 발행을 할 때 발생할 수 있는 실패를 인지하고 처리하는 방법에 대해 살펴보았습니다. Callback을 활용한 방법을 중심으로 설명하였으며, 이를 통해 비동기 이벤트 발행의 장점을 살리면서도 안정성을 확보할 수 있는 방법을 제시하였습니다.

번외

spring kafka 3.x부터는 비동기 프로그래밍 모델이 많이 개선되었습니다 아래와 같이 콜백을 적용해야합니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncKafkaProducer {
    private static final Logger logger = LoggerFactory.getLogger(AsyncKafkaProducer.class);

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "my-topic";
        String producerDeduplicationId = "my-key";
        String bodyJSONString = "my-value";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, producerDeduplicationId, bodyJSONString);

        CompletableFuture<RecordMetadata> future = sendAsync(producer, record);

        future.whenCompleteAsync((metadata, exception) -> {
            if (exception != null) {
                logger.error("Failed to send message " + exception + ", message: [key: " + producerDeduplicationId + " , body: " + bodyJSONString + "]");
                // retry.... logic
            } else {
                logger.info("Successfully sent message [key: " + producerDeduplicationId + " , body: " + bodyJSONString + "]");
            }
        });

        producer.close();
    }

    private static CompletableFuture<RecordMetadata> sendAsync(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                future.completeExceptionally(exception);
            } else {
                future.complete(metadata);
            }
        });
        return future;
    }
}

0개의 댓글