프로듀서 애플리케이션을 개발해보자.
먼저 build.gradle에 디펜던시를 추가해줘야 한다.
build.gradle
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}

아파치 카프카를 배포할 때 사용하는 공식 라이브러리인 kafka-clients와 로그를 남기기 위한 slf4j 디펜던시를 추가한다.
이제 프로듀서용 클래스를 생성 후 프로듀서를 개발해보자.
ProducerTest.java 전체코드
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerTest {
/**
* 1. 전역 설정
*/
private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);
private final static String TOPIC_NAME = "test-by-app";
private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
public static void main(String[] args) {
/**
* 2. 필수 옵션 3가지에 대한 Properties 설정
* 1) 서버
* 2) 메시지 키 직렬화 옵션
* 3) 메시지 값 직렬화 옵션
**/
Properties configs = new Properties();
// 서버
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 메시지 키 직렬화 옵션
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 메시지 값 직렬화 옵션
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* 3. 카프카 프로듀서 인스턴스 생성
* 미리 정의한 property를 바탕으로
* 지정한 서버와 통신하여
* 지정한 타입으로 메시지 키와 값을 직렬화하는
* 카프카 프로듀서 인스턴스 생성
**/
// KafkaProducer<키 직렬화 타입, 값 직렬화 타입> 인스턴스명 = new KafkaProducer<>(프로퍼티 변수);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String msgVal = "testByApplicationMessage";
/**
* 4. 프로듀서 레코드 인스턴스 생성
* 필수값으로 토픽명과 메시지값 필요
*/
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, msgVal);
/**
* 5. 파라미터로 프로듀서레코드 전달하여 KafkaProducer의 send() 메소드 호출
* 내부적으로 Partitioner와 Accumulator (배치로 모음) 동작
* -> 카프카 프로듀서에 의해 정의한 레코드 전송
*/
producer.send(record);
logger.info("{}", record);
/**
* flush() : 데이터 전송 강제하고 싶을 때 사용
* Accumulator에 있는 모든 데이터를 날린 후 데이터 전송을 강제화 함
*/
producer.flush();
/**
* 6. 프로듀서 종료
*/
producer.close();
}
}
private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);
private final static String TOPIC_NAME = "test-by-app";
private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
먼저 전역 상수 설정을 해준다.
토픽명과 부트스트랩 서버 설정을 해준다.
/**
* 1. 필수 옵션 3가지에 대한 Properties 설정
* 1) 서버
* 2) 메시지 키 직렬화 옵션
* 3) 메시지 값 직렬화 옵션
**/
Properties configs = new Properties();
// 서버
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 메시지 키 직렬화 옵션
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 메시지 값 직렬화 옵션
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
코드 상에서는 가장 먼저 Properties를 생성한다.
필수 옵션인 서버, 메시지 키 직렬화 옵션, 메시지 값 직렬화 옵션을 설정한다.
/**
* 3. 카프카 프로듀서 인스턴스 생성
* 미리 정의한 property를 바탕으로
* 지정한 서버와 통신하여
* 지정한 타입으로 메시지 키와 값을 직렬화하는
* 카프카 프로듀서 인스턴스 생성
**/
// KafkaProducer<키 직렬화 타입, 값 직렬화 타입> 인스턴스명 = new KafkaProducer<>(프로퍼티 변수);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
다음으로는 properties 변수를 파라미터로 넣어준 카프카프로듀서 인스턴스를 생성해준다.
String msgVal = "testByApplicationMessage";
/**
* 4. 프로듀서 레코드 인스턴스 생성
* 필수값으로 토픽명과 메시지값 필요
*/
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, msgVal);
이제 프로듀서레코드를 정의할 차례이다.
토픽명과 메시지값을 가지는 프로듀서레코드 인스턴스를 생성해둔다.
/**
* 5. 파라미터로 프로듀서레코드 전달하여 KafkaProducer의 send() 메소드 호출
* 내부적으로 Partitioner와 Accumulator (배치로 모음) 동작
* -> 카프카 프로듀서에 의해 정의한 레코드 전송
*/
producer.send(record);
logger.info("{}", record);
정의해둔 레코드를 카프카프로듀서의 send() 메소드를 이용해 전송한다.
/**
* flush() : 데이터 전송 강제하고 싶을 때 사용
* Accumulator에 있는 모든 데이터를 날린 후 데이터 전송을 강제화 함
*/
producer.flush();
/**
* 6. 프로듀서 종료
*/
producer.close();
Accumulator에 있는 모든 데이터를 날린 후 데이터 전송을 강제화하고 싶을 때 flush() 메소드를 사용할 수도 있다.
프로듀스를 완료한 후에는 close() 메소드로 안전하게 프로듀서를 종료하고 리소스를 해제한다.
이 메소드를 사용하면 Accumulator에 저장되어 있는 모든 데이터를 카프카 클러스터로 전송해준다.
안전한 프로듀싱을 위해서는 flush()와 close()를 함께 사용해주는 것이 좋다.

실행되면 다음과 같이 카프카 설정을 로그로 확인할 수 있다.

로그로 찍어준 레코드 정보도 확인할 수 있다.

이제 실제로 해당 토픽이 만들어졌고, 메시지도 잘 들어갔음을 확인해볼 수 있다.