Kafka에서 auto.offset.reset 설정은 컨슈머의 행동 방식을 결정하는 설정값으로 컨슈머가 유효한 오프셋을 가지고 있지 않을 때 어떻게 반응할지를 설정하는 값입니다.
earliest
이 옵션은 컨슈머가 특정 파티션의 가장 낮은 오프셋부터 읽기를 시작하게 합니다. 해당 설정을 통해 데이터 손실을 방지할 수 있으며 새로운 컨슈머 그룹이 시작되었을때 기존 데이터에 대한 처리를 하고 싶은 경우에 설정하시면 됩니다.
latest
latest 옵션은 컨슈머가 활성화된 후 발생한 메시지부터 읽기 시작하도록 설정합니다. 기존 데이터에 대한 처리보다는 실시간으로 들어오는 데이터 처리에 사용하시면 됩니다.
none
none 설정은 컨슈머 그룹에 유효한 오프셋 정보가 없을 경우 오류를 발생시키고 종료합니다.
여기서 주목해야 할 점은 위 옵션은 적당한 offset이 없는 경우에 동작하는 옵션들입니다.
@Slf4j
public class SimpleConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
@SneakyThrows
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, {option});
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info("partition: {} ,Received message: (key: {}, value: {}) at offset {}", record.partition(),
record.key(), record.value(),
record.offset());
}
}
}
}
}
위 코드를 실행하기전에 미리 topic에 메시지를 넣어줘야 합니다.
위 코드를 실행하면 가장 처음 offset부터 읽어오게 되는데 이 때 위 consumer의 경우 처음 실행한 것이기에 유효한 offset이 없는 상황입니다.
그렇기에 처음 부터 모든 메시지를 읽어오게 됩니다.(처음 부터라고 작성하였지만 실제로는 가장 오래된 offset부터 읽어 옵니다.)
기존의 메시지를 읽지 않고 이 후 도착한 메시지를 읽습니다. 컨슈머를 실행한 상황에서 데이터를 발행하면 로그가 찍히는 것을 확인 할 수 있습니다.
유효한 offset이 없으므로 에러가 발생합니다.
Exception in thread "main" org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions