데이터를 가져가는 polling 주체.
커밋을 통해 읽은 컨슈머 offset을 카프카에 기록.
데이터 저장 위치.
package com.example.kafkatester;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
@SpringBootApplication
public class KafkaTesterApplication {
private static String TOPIC_NAME = "test";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
프로듀서에서 레코드 주입
$ kafka-console-producer.bat --bootstrap-server localhost:9092 --topic hello-kafka
>Hello
>What is
>123qawe
>
컨슈머에서 레코드를 가져옴
...
This is record 0
This is record 2
This is record 3
Hello
What is
123qawe
This is record 1
This is record 5
This is record 7
...
/*
ENABLE_AUTO_COMMIT_CONFIG: 기본 옵션 true. polling 할때 어디까지 읽었는지 브로커에 알려준다. 자동 커밋이 되기 전에 컨슈머가 강제 종료 되면 컴슈머는 다음 실행시 커밋 이전까지 오프셋부터 레코드를 읽어온다(중복 처리)
*/
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 자동 커밋 여부
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);// 자동 커밋 주기
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
enable.auto.commit=true : 일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit
commit 관련 코드를 작성할 필요 없기에 편리함.
속도가 가장 빠름. 그러나 중복 또는 유실이 발생할 수 있음(중복/유실을 허용하지 않는 상황에서는 사용하면 안됨)
일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용
중복 처리 되는 메시지들
유실되는 메시지들
리밸런싱 뿐만 아니라 컨슈머가 갑자기 종료 되어도 문제 발생
오토 커밋을 사용하되, 컨슈머가 죽지 않도록 기도 ?
오토 커밋을 사용하지 않는다
enable.auto.commit=false
commitSync() : 동기 커밋
commitAsync() :
Ex) commitSync
...
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// sync commit 위해 필요
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
consumer.commitSync();// 커밋
record.offset();
}
}