저번에 작성했던 Producer API에 이어 이번에는 파티션으로부터 메세지를 받는 Consumer API를 작성해보려고 한다.
[Producer API 글]
https://velog.io/@statco19/Kafka-Producer-API
아마 Kafka를 처음 실습하면서 CLI(Command Line Interface)를 통해 터미널에서 producer, consumer를 실습해봤을 것이다. 이번에는 저번 Producer와 마찬가지로 Java programming을 통해 CLI와 같은 결과를 내는 Java code를 작성해보려고 한다.
단 하나의 컨슈머를 그룹에 할당하여 3개의 파티션을 가지는 브로커에서 메세지를 받아오는 코드를 작성한다.
package com.github.simple.kafka.tutorial1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
public class ConsumerDemo {
private final static Logger logger = LoggerFactory.getLogger(ProducerDemo.class.getName());
private final static String TOPIC_NAME = "first-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "my-fourth-application";
public static void main(String[] args) {
//create properties
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
//subscribe consumer to topics
// consumer.subscribe(Arrays.asList("first-topic", "second-topic")); 여러 개의 토픽을 Arrays.asList()를 사용하여 subscribe
// consumer.subscribe(Collections.singleton(TOPIC_NAME)); 한 개의 토픽 subscribe
consumer.subscribe(Arrays.asList(TOPIC_NAME));
//poll for new data
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 100ms동안 하나의 record를 파티션에서 받아온다
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition " + record.partition() + ", Offset: " + record.offset());
}
}
}
}
저번 프로듀서와 크게 다르지 않지만, group id 설정과 subscribe 방법에 주의하면서 코드를 작성한다.
위의 코드를 실행시키고 한 번 더 실행시키면 한 번에 두 개의 컨슈머가 동일한 group id 안에서 실행되는 것을 확인할 수 있다.
만약 두 개의 컨슈머가 동시에 실행되지 않고 계속 재시작한다면 설정을 바꿔줘야 한다.
위의 코드의 파일 이름인 ConsumerDemo를 예로 들자면, IntelliJ에서는
파일 우클릭 > More Run/Debug > Modify Run Configuration... > Modify options > Allow multiple instances 체크
이렇게 해주면 한 번에 여러 개의 컨슈머를 동시에 실행시킬 수 있다.
위의 코드는 특정 토픽을 subscribe해서 메세지를 받아왔다. 이번에는 TopicPartition이라는 객체를 통해 특정 토픽과 파티션을 지정하고, 해당 파티션에서 특정 offset부터 메세지를 받아오는 코드를 작성한다.
package com.github.simple.kafka.tutorial1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemoAssignSeek {
private final static Logger logger = LoggerFactory.getLogger(ProducerDemo.class.getName());
private final static String TOPIC_NAME = "first-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
//create properties
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
//assign
TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0); // partition 0
TopicPartition partition1 = new TopicPartition(TOPIC_NAME, 1); // partition 1
TopicPartition partition2 = new TopicPartition(TOPIC_NAME, 2); // partition 2
long offsetToReadFrom = 0L;
consumer.assign(Arrays.asList(partition0, partition1, partition2));
//seek
consumer.seek(partition0, offsetToReadFrom);
consumer.seek(partition1, offsetToReadFrom);
consumer.seek(partition2, offsetToReadFrom);
//how many messages you want to read
int numOfMessagesToRead = 15;
int numOfMessagesRead = 0;
boolean keepOnReading = true;
//poll for new data
while(keepOnReading) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
numOfMessagesRead++;
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition " + record.partition() + ", Offset: " + record.offset());
if(numOfMessagesRead >= numOfMessagesToRead) {
keepOnReading = false;
break;
}
}
}
logger.info("Exiting the application");
}
}