[Confluent Kafka] Java Kafka-Client 통신하기

식빵·2025년 8월 3일

kafka

목록 보기
2/2

이 글은 Confluent Kafka (ver 7.9) 를 기반으로 작성됐습니다.
다른 버전을 사용하시는 분들은 호환성과 관련된 내용을 유의해서 읽어주시기 바랍니다.

Java, Spring Boot 프로젝트를 사용할 줄 안다고 가정하고 글을 작성했습니다.
잘 모르시는 부분 있으면 댓글 달아주세요.


🍞 Java 프로젝트 생성

프로젝트는 Spring Boot (gradle builder) 기반으로 생성합니다.
프로젝트 생성은 이 링크를 클릭하면 제가 미리 세팅한
프로젝트 설정값들로 채워진 Spring Initializr 화면이 나옵니다.

혹시 저랑 다른 버전의 Confluent Kafka 를 쓰신다면 JDK 호환 버전을
아래 링크를 통해서 확인하고, 프로젝트 설정에서 Java 버전을 바꿔주세요.
Confluent Platform 와 Java Version 호환성 확인 링크


설정이 본인한테 잘 맞으면 화면 하단에 Generate 버튼 클릭해서
프로젝트를 zip 파일로 다운로드 받습니다.


마지막으로 다운로드 받은 프로젝트 파일(zip)을 압축해제하고,
압축해제 디렉토리를 IDE 로 열면 끝입니다.




🍞 Kafka-Client 호환 버전

Confluent 와 Apache Kafka 호환 버전

먼저 현재 설치된 Confluent-Kafka 의 버전에 호환되는 Apache Kafka 버전을
알아내야 합니다. 이를 위해서는 아래 링크를 클릭해주세요.

https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility

저는 Confluent Platform 버전이 7.9 이고,
이와 호환되는 Apache Kafka 버전이 3.9.x 임을 알 수 있습니다.
이건 Kafka-Client 라이브러리도 3.9.x 버전을 사용하면 된다는 의미입니다.



Maven Repo : 라이브러리 검색

Maven Repository: kafka-client 링크에 들어가서
kafka-client 라이브러리에 3.9.x 가 있는지 확인합니다.

3.9.0 은 Vulnerabilities 가 있으므로, 3.9.1 버전을 사용하도록 하겠습니다.
버전 번호를 클릭합니다.


저는 현재 gradle 기반의 java 프로젝트를 생성했으므로 위와 같이
gradle 탭을 클릭하고, dependency 스크립트를 복사해갑니다.

그리고 나서 프로젝트 build.gradle 파일의 dependenies 영역에 2줄을 추가합니다.

dependencies {
  
  	//... 기존 의존성들은 관심없으니 생략 ...
  
    // Kafka 는 로깅 라이브러리가 필요. logback-classic 의존성 추가.
    // 버전은 io.spring.dependency-management plugin 이 지정해줌. 
    // 명시적으로 버전 지정안 해도 됨.
    implementation 'ch.qos.logback:logback-classic'

  	// 아까 복사한 dependency 스크립트 복사, 붙여넣기
    implementation 'org.apache.kafka:kafka-clients:3.9.1'
}




🍞 Kafka Server 설정 수정

잠시 Kafka 설치 서버에 들어가서 server.properties 를 수정해줍시다.
(파일 위치: $CONFLUENT_HOME/etc/kafka/server.properties)

server.properties 에서 advertised.listeners 라고 적힌 곳이 있습니다.
아마 처음에는 주석처리 되어 있는데,
주석을 없애고 명시적으로 Kafka 서버의 IP 정보 를 작성해야 합니다.

저는 VirtualBox Host-Only Network 를 통해서 현재 Kafka 서버에
고정 IP(192.168.56.101) 를 할당한 상태입니다.
그래서 아래처럼 수정해줬습니다.

  • PLAINTEXT://[KAFKA_SERVER_IP_OR_DOMAIN]:9092

서버 또는 도메인 정보는 Kafka-Client 입장에서 Kafka Server 에 접근하기
위해 노출된 IP 또는 DOMAIN 을 입력해야 합니다.

예를 들어 AWS EC2Kafka 를 설치했다면, EC2 서버의 외부에 노출된
Public IP 를 작성해야 합니다.


이후에 Kafka Server, ZooKeeper 를 모두 껐다가 다시 켜줍니다.
참고로 이걸 안하면 나중에 javaProducer 코드를 작성/실행하면 아래와 같이
자신의 VirtualBox 의 서버명칭을 사용해서 뭔가 네트워크 통신을 해서 에러가 납니다.

에러내용:
org.apache.kafka.clients.NetworkClient



🍞 Topic 생성해두기

Java Producer 코드와 통신할 Topic 을 미리 생성해두겠습니다.
토픽 명칭은 my-topic 으로 하겠습니다.

kafka-topics --bootstrap-server localhost:9092 \
			 --create --topic my-topic



🍞 Java Producer 작성 및 실행

package coding.toast.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class FirstProducer {
    public static void main(String[] args) {

        // KafkaProducer 설정 지정
        Properties props = new Properties();
        
        // 여러분들의 Kafka Server IP 를 정확히 작성해주세요!
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // KafkaProducer 생성
        // Type Param 은 "KEY_SERIALIZER_CLASS_CONFIG", 
        // "VALUE_SERIALIZER_CLASS_CONFIG" 에 맞춰야 합니다!
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {


            // Record 도 Type Parameter 타입을 
            // KEY_SERIALIZER_CLASS_CONFIG, 
            // VALUE_SERIALIZER_CLASS_CONFIG 에 맞춰서 지정합니다.
            ProducerRecord<String, String> record
                    = new  ProducerRecord<>("my-topic", null, "THIS IS MY FIRST MESSAGE!");

            // send message(=record)
            producer.send(record);

            // flush all buffer to broker
            producer.flush();
        }
    }
}

이렇게 작성하고 한번 실행시켜주세요~




🍞 Topic Subscriber 로 확인

이제 Java 로 Produce 한 메시지를 Kafka Server 에서 확인합니다.

kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning

메시지가 잘 나오네요!

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글