kafka auto.offset.reset란?

greenTea·2024년 7월 7일

Kafka auto.offset.reset 옵션

Kafka에서 auto.offset.reset 설정은 컨슈머의 행동 방식을 결정하는 설정값으로 컨슈머가 유효한 오프셋을 가지고 있지 않을 때 어떻게 반응할지를 설정하는 값입니다.

  1. earliest
    이 옵션은 컨슈머가 특정 파티션의 가장 낮은 오프셋부터 읽기를 시작하게 합니다. 해당 설정을 통해 데이터 손실을 방지할 수 있으며 새로운 컨슈머 그룹이 시작되었을때 기존 데이터에 대한 처리를 하고 싶은 경우에 설정하시면 됩니다.

  2. latest
    latest 옵션은 컨슈머가 활성화된 후 발생한 메시지부터 읽기 시작하도록 설정합니다. 기존 데이터에 대한 처리보다는 실시간으로 들어오는 데이터 처리에 사용하시면 됩니다.

  3. none
    none 설정은 컨슈머 그룹에 유효한 오프셋 정보가 없을 경우 오류를 발생시키고 종료합니다.

여기서 주목해야 할 점은 위 옵션은 적당한 offset이 없는 경우에 동작하는 옵션들입니다.

Java 코드 예시

Consumer

@Slf4j
public class SimpleConsumer {
	private final static String TOPIC_NAME = "test";
	private final static String BOOTSTRAP_SERVERS = "localhost:9092";

	@SneakyThrows
	public static void main(String[] args) {
		Properties configs = new Properties();

		configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
		configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, {option});
		configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

		try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
			consumer.subscribe(Collections.singletonList(TOPIC_NAME));
			while (true) {
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

				for (ConsumerRecord<String, String> record : records) {
					log.info("partition: {} ,Received message: (key: {}, value: {}) at offset {}", record.partition(),
						record.key(), record.value(),
						record.offset());

				}
			}
		}
	}
}

위 코드를 실행하기전에 미리 topic에 메시지를 넣어줘야 합니다.

1.earliest

위 코드를 실행하면 가장 처음 offset부터 읽어오게 되는데 이 때 위 consumer의 경우 처음 실행한 것이기에 유효한 offset이 없는 상황입니다.

그렇기에 처음 부터 모든 메시지를 읽어오게 됩니다.(처음 부터라고 작성하였지만 실제로는 가장 오래된 offset부터 읽어 옵니다.)

2. latest

기존의 메시지를 읽지 않고 이 후 도착한 메시지를 읽습니다. 컨슈머를 실행한 상황에서 데이터를 발행하면 로그가 찍히는 것을 확인 할 수 있습니다.

3. none

유효한 offset이 없으므로 에러가 발생합니다.

Exception in thread "main" org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions

참고자료

kafka doc

profile
greenTea입니다.

0개의 댓글