Kafka Consumer API - Java

Andrew·2022년 1월 19일
0

kafka-programming

목록 보기
5/6

저번에 작성했던 Producer API에 이어 이번에는 파티션으로부터 메세지를 받는 Consumer API를 작성해보려고 한다.

[Producer API 글]
https://velog.io/@statco19/Kafka-Producer-API

아마 Kafka를 처음 실습하면서 CLI(Command Line Interface)를 통해 터미널에서 producer, consumer를 실습해봤을 것이다. 이번에는 저번 Producer와 마찬가지로 Java programming을 통해 CLI와 같은 결과를 내는 Java code를 작성해보려고 한다.

One consumer within a group

단 하나의 컨슈머를 그룹에 할당하여 3개의 파티션을 가지는 브로커에서 메세지를 받아오는 코드를 작성한다.

package com.github.simple.kafka.tutorial1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class ConsumerDemo {

    private final static Logger logger = LoggerFactory.getLogger(ProducerDemo.class.getName());
    private final static String TOPIC_NAME = "first-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "my-fourth-application";

    public static void main(String[] args) {

        //create properties
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);


        //subscribe consumer to topics
//        consumer.subscribe(Arrays.asList("first-topic", "second-topic")); 여러 개의 토픽을 Arrays.asList()를 사용하여 subscribe
//        consumer.subscribe(Collections.singleton(TOPIC_NAME)); 한 개의 토픽 subscribe 
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        //poll for new data
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  // 100ms동안 하나의 record를 파티션에서 받아온다
            for (ConsumerRecord<String, String> record : records) {
                logger.info("Key: " + record.key() + ", Value: " + record.value());
                logger.info("Partition " + record.partition() + ", Offset: " + record.offset());

            }
        }
    }
}

저번 프로듀서와 크게 다르지 않지만, group id 설정과 subscribe 방법에 주의하면서 코드를 작성한다.

Multiple consumers

위의 코드를 실행시키고 한 번 더 실행시키면 한 번에 두 개의 컨슈머가 동일한 group id 안에서 실행되는 것을 확인할 수 있다.

만약 두 개의 컨슈머가 동시에 실행되지 않고 계속 재시작한다면 설정을 바꿔줘야 한다.

위의 코드의 파일 이름인 ConsumerDemo를 예로 들자면, IntelliJ에서는

파일 우클릭 > More Run/Debug > Modify Run Configuration... > Modify options > Allow multiple instances 체크

이렇게 해주면 한 번에 여러 개의 컨슈머를 동시에 실행시킬 수 있다.

Assign & Seek

위의 코드는 특정 토픽을 subscribe해서 메세지를 받아왔다. 이번에는 TopicPartition이라는 객체를 통해 특정 토픽과 파티션을 지정하고, 해당 파티션에서 특정 offset부터 메세지를 받아오는 코드를 작성한다.

package com.github.simple.kafka.tutorial1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemoAssignSeek {

    private final static Logger logger = LoggerFactory.getLogger(ProducerDemo.class.getName());
    private final static String TOPIC_NAME = "first-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {

        //create properties
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);

        //assign
        TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);  // partition 0
        TopicPartition partition1 = new TopicPartition(TOPIC_NAME, 1);  // partition 1
        TopicPartition partition2 = new TopicPartition(TOPIC_NAME, 2);  // partition 2

        long offsetToReadFrom = 0L;
        consumer.assign(Arrays.asList(partition0, partition1, partition2));

        //seek
        consumer.seek(partition0, offsetToReadFrom);
        consumer.seek(partition1, offsetToReadFrom);
        consumer.seek(partition2, offsetToReadFrom);

        //how many messages you want to read
        int numOfMessagesToRead = 15;
        int numOfMessagesRead = 0;
        boolean keepOnReading = true;

        //poll for new data
        while(keepOnReading) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                numOfMessagesRead++;
                logger.info("Key: " + record.key() + ", Value: " + record.value());
                logger.info("Partition " + record.partition() + ", Offset: " + record.offset());

                if(numOfMessagesRead >= numOfMessagesToRead) {
                    keepOnReading = false;
                    break;
                }
            }
        }

        logger.info("Exiting the application");
    }
}
profile
조금씩 나아지는 중입니다!

0개의 댓글