[Kafka] Java Kafka Consumer 튜토리얼

이재민·2024년 2월 24일
0

Kafka

목록 보기
16/17

Kafka Consumer 만드는 법

Producer properties와 비슷하게 Kafka는 다양한 Consumer Properties를 제공합니다.
공식문서를 참고하려 Consumer에 필요한 속성을 확인할 수 있습니다.
Kafka > Documentation > Configuration > Consumer Configs

1. Properties 설정

String bootstrapServers = "127.0.0.1:9092";
String groupId = "my-fourth-application";
String topic = "demo_java";

// consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer .class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

key, value, server에 대한 설명은 생략하겠습니다.

  • group.id : consumerGroup의 consumer를 식별하는 고유한 문자열입니다.

  • auto.offset.reset : 초기에 오프셋이 없거나, 현재 오프셋이 서버에 더 이상 존재하지 않는 경우 해당 속성이 필요합니다.
    오프셋 값을 재 설정하는데 사용되는 값은 아래와 같습니다.

    • earliest: 이 오프셋 변수는 값을 가장 빠른 오프셋으로 자동 재설정합니다.(애플리케이션을 처음 실행할때 topic의 기록된 모든 데이터를 읽는 것을 의미합니다.)
      앞선 게시글에서 설명드렸던 CLI 명령어에서 --from-beginning 옵션에 해당합니다.

    • latest: 이 오프셋 변수는 오프셋 값을 최신 오프셋으로 재설정합니다.
      애플리케이션 실행 후부터 발생한 메시지를 읽는 것을 의미합니다.

    • none: 이전 그룹에 대한 이전 오프셋이 없으면 소비자에게 예외가 발생합니다.
      또한, 설정된 consumer group이 존재하지 않을 경우 동작하지 않습니다. 즉, 애플리케이션 실행전에 consumer group을 설정해야한다는 것을 의미합니다.


2. Kafka Consumer 생성

// create kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

3. subscribe consumer

// subscribe consumer
consumer.subscribe(Arrays.asList(topic));

4. poll

// poll for new data
while (true) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

	for (ConsumerRecord<String, String> record : records) {
		log.info("Key: " + record.key() + ", Value: " + record.value());
		log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
	}
}

5. consumer application 실행

consumer는 polling 후 데이터를 읽게됩니다. 처음부터 데이터를 읽지 못했던 이유는 커밋된 offset이 없기 때문에 auto.commit.offset=earliest설정에 따라 파티션들은 0번 offset으로 설정된 후 데이터를 읽게 됩니다.

Consumer 재실행

consumer를 종료하고, producer에서 새로운 메시지를 발행 후 consumer를 실행시키면 아래 오프셋을 찾는데 꽤 오랜시간이 걸리게 됩니다. consumer를 종료할때 깔끔하게 종료하지 않았기 때문이죠.

Consumer 우아한 종료(Graceful shutdown)

consumer를 깔끔하게 종료하기 위해 shutdown hook를 추가하도록 하겠습니다.
코드를 작성 후 실행한 결과는 아래와 같습니다.

[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fourth-application-1, groupId=my-fourth-application] Revoke previously assigned partitions demo_java-0, demo_java-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fourth-application-1, groupId=my-fourth-application] Member consumer-my-fourth-application-1-cbdc2723-e61b-43a1-bb00-ec4adc3be0c8 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fourth-application-1, groupId=my-fourth-application] Resetting generation due to: consumer pro-actively leaving the group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fourth-application-1, groupId=my-fourth-application] Request joining group due to: consumer pro-actively leaving the group

새로운 스레드가 종료를 감지하고, 작성한 로그 메시지들이 출력되는 것이 확인할 수 있습니다.
위 메시지를 확인해보면 이전에 할당된 파티션을 취소하고 있고, 컨슈머 그룹을 나가고 있습니다.
카프카의 할 일이 모두 조료되면 마지막 컨슈머가 우아하게 종료되었다는 로그 메시지를 볼 수 있습니다.

코드 확인

애플리케이션을 종료하면, shutdown 신호를 보내게 됩니다. 메인 스레드가 아닌 새로운 스레드가 shutdown을 감지합니다.
consumer.wakeup() 메소드를 호출하면, consumer.poll() 메소드를 호출할때, WakeupException 을 발생하게 됩니다.
그래서 아래와 같이 poll() 하는 전체를 try, catch 로 감싸줍니다.

consumer.wakeup() 메소드 호출 후 poll() 메소드에서 Wakeupexception 이 발생하기 때문에, 해당 예외 처리를 진행합니다.
그리고 마지막으로 consumer.close() 를 통해 consumer를 종료, 오프셋을 커밋 그리고 컨슈머 그룹도 리밸런싱을 진행합니다.

profile
문제 해결과 개선 과제를 수행하며 성장을 추구하는 것을 좋아합니다.

0개의 댓글