implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("group.id", "click_log_group");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
//어느 토픽에서 데이터를 가져올지 결정한다.
consumer.subscribe(Array.asList("click_log"));
//데이터를 실질적으로 가져오는 폴링 루프: poll 메서드가 포함된 무한 루프
while(true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("group.id", "click_log_group");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
//어느 토픽에서 데이터를 가져올지 결정한다.
consumer.subscribe(Array.asList("click_log"));
// === 추가된 코드 === //
TopicPartition partition0 = new TopicPartition(topicName, 0);
TopicPartition partition1 = new TopicPartition(topicName, 1);
consumer.assign(Arrays.asList(partition0, partition1));
//데이터를 실질적으로 가져오는 폴링 루프: poll 메서드가 포함된 무한 루프
while(true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
__consumer_offsets
토픽에 저장된다.auto.commit.interval.ms
), poll() 메서드 호출시 자동 commit오토 커밋을 사용하지 않는다. Kafka Consumer의 commitSync()
, commitAsync()
사용
commitSync()
: 동기 커밋
commitAsync()
: 비동기 커밋
리밸런스: 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상
리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 없다.
리밸런스 발생히 데이터 유실 / 중복 발생 가능성이 있다.
→ commitSync()
또는 추가적인 방법으로 데이터 유실 / 중복 방지
언제 리밸런스가 발생하는가?
→ consumer.close()
호출시 또는 consumer 세선이 끊어졌을 때