Kafka를 처음 사용할 때, 이벤트 유실을 방지하기 위해 Persistence Layer의 도움을 받아야 할지, 아니면 동기적으로 호출해야 할지에 대해 많은 고민을 했습니다. 이벤트 유실은 치명적일 수 있기 때문에 안정성을 우선시하고 싶었지만, 성능적인 이슈도 무시할 수 없었습니다. 특히, 실시간 데이터 처리가 중요한 시스템에서는 높은 처리량과 낮은 지연 시간이 필수적입니다. 이러한 고민 끝에, 성능을 위해 비동기 방식을 포기할 수 없다는 결론에 도달했습니다.
그렇다면 Kafka에서 이벤트를 비동기적으로 발행하면 장점이 무엇일까요?
하지만 비동기 발행을 할 때 이벤트 발행 실패를 감지하고 적절히 대응하는 것이 매우 중요합니다. 그렇지 않으면 데이터 손실이 발생할 수 있습니다.
Kafka의 비동기 발행 방식에서는 Callback
인터페이스를 활용하여 이벤트 발행의 성공 또는 실패 여부를 확인할 수 있습니다. Callback
은 이벤트 발행 요청이 완료된 후 호출되는 메서드를 정의합니다.
public interface Callback {
void onCompletion(RecordMetadata metadata, Exception exception);
}
onCompletion
메서드는 두 개의 매개변수를 받습니다:
RecordMetadata metadata
: 발행된 이벤트에 대한 메타데이터 정보를 담고 있습니다.Exception exception
: 이벤트 발행 중 발생한 예외를 담고 있습니다. 성공적으로 발행된 경우에는 null
이 됩니다.아래는 Kafka 프로듀서를 사용하여 이벤트를 비동기적으로 발행하고 Callback
을 활용하여 발행 실패를 감지하는 예제 코드입니다:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class AsyncKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = "my-topic";
String key = "my-key";
String value = "my-value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 발행 성공
System.out.println("Message sent successfully to topic " + metadata.topic() + " partition " + metadata.partition() + " at offset " + metadata.offset());
} else {
// 발행 실패
System.err.println("Error sending message: " + exception.getMessage());
exception.printStackTrace();
// 재시도 로직 또는 대체 처리 로직 작성
}
}
});
producer.close();
}
}
위 예제에서 onCompletion
메서드 내의 exception
이 null
인 경우 발행이 성공한 것입니다. 예외가 발생한 경우에는 exception
객체를 통해 상세한 에러 메시지를 확인할 수 있습니다. 이를 기반으로 다음과 같은 추가 처리를 할 수 있습니다:
Kafka에서 비동기 이벤트 발행을 할 때 Callback
을 활용하면 이벤트 발행 실패를 효과적으로 감지하고 처리할 수 있습니다. 이를 통해 시스템의 안정성과 신뢰성을 높일 수 있습니다. 위 예제를 참고하여 비동기 이벤트 발행 시 적절한 예외 처리와 재시도 로직을 구현해 보세요.
Kafka에서 비동기 이벤트 발행을 할 때 발생할 수 있는 실패를 인지하고 처리하는 방법에 대해 살펴보았습니다. Callback
을 활용한 방법을 중심으로 설명하였으며, 이를 통해 비동기 이벤트 발행의 장점을 살리면서도 안정성을 확보할 수 있는 방법을 제시하였습니다.
spring kafka 3.x부터는 비동기 프로그래밍 모델이 많이 개선되었습니다 아래와 같이 콜백을 적용해야합니다.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AsyncKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(AsyncKafkaProducer.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = "my-topic";
String producerDeduplicationId = "my-key";
String bodyJSONString = "my-value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, producerDeduplicationId, bodyJSONString);
CompletableFuture<RecordMetadata> future = sendAsync(producer, record);
future.whenCompleteAsync((metadata, exception) -> {
if (exception != null) {
logger.error("Failed to send message " + exception + ", message: [key: " + producerDeduplicationId + " , body: " + bodyJSONString + "]");
// retry.... logic
} else {
logger.info("Successfully sent message [key: " + producerDeduplicationId + " , body: " + bodyJSONString + "]");
}
});
producer.close();
}
private static CompletableFuture<RecordMetadata> sendAsync(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(metadata);
}
});
return future;
}
}