[KAFKA] 프로듀서 애플리케이션 개발 - 값 지정

.·2024년 7월 28일

KAFKA

목록 보기
13/21

프로듀서 애플리케이션을 개발해보자.

1. 의존성 추가

먼저 build.gradle에 디펜던시를 추가해줘야 한다.

build.gradle

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.5.0'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
}

아파치 카프카를 배포할 때 사용하는 공식 라이브러리인 kafka-clients와 로그를 남기기 위한 slf4j 디펜던시를 추가한다.

2. 프로듀서 개발

이제 프로듀서용 클래스를 생성 후 프로듀서를 개발해보자.


ProducerTest.java 전체코드

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerTest {

    /**
     * 1. 전역 설정
     */
    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);
    private final static String TOPIC_NAME = "test-by-app";
    private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";

    public static void main(String[] args) {

        /**
         * 2. 필수 옵션 3가지에 대한 Properties 설정
         *  1) 서버
         *  2) 메시지 키 직렬화 옵션
         *  3) 메시지 값 직렬화 옵션
         **/
        Properties configs = new Properties();
        // 서버
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 메시지 키 직렬화 옵션
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 메시지 값 직렬화 옵션
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 3. 카프카 프로듀서 인스턴스 생성
         *  미리 정의한 property를 바탕으로
         *  지정한 서버와 통신하여
         *  지정한 타입으로 메시지 키와 값을 직렬화하는
         *  카프카 프로듀서 인스턴스 생성
         **/
        // KafkaProducer<키 직렬화 타입, 값 직렬화 타입> 인스턴스명 = new KafkaProducer<>(프로퍼티 변수);
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String msgVal = "testByApplicationMessage";

        /**
         * 4. 프로듀서 레코드 인스턴스 생성
         *  필수값으로 토픽명과 메시지값 필요
         */
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, msgVal);

        /**
         * 5. 파라미터로 프로듀서레코드 전달하여 KafkaProducer의 send() 메소드 호출
         *  내부적으로 Partitioner와 Accumulator (배치로 모음) 동작
         *  -> 카프카 프로듀서에 의해 정의한 레코드 전송
         */
        producer.send(record);
        logger.info("{}", record);

        /**
         * flush() : 데이터 전송 강제하고 싶을 때 사용
         *  Accumulator에 있는 모든 데이터를 날린 후 데이터 전송을 강제화 함
         */
        producer.flush();

        /**
         * 6. 프로듀서 종료
         */
        producer.close();
    }
}

1) 전역 설정

    private final static Logger logger = LoggerFactory.getLogger(ProducerTest.class);
    private final static String TOPIC_NAME = "test-by-app";
    private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";

먼저 전역 상수 설정을 해준다.
토픽명과 부트스트랩 서버 설정을 해준다.

2) Properties

        /**
         * 1. 필수 옵션 3가지에 대한 Properties 설정
         *  1) 서버
         *  2) 메시지 키 직렬화 옵션
         *  3) 메시지 값 직렬화 옵션
         **/
        Properties configs = new Properties();
        // 서버
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 메시지 키 직렬화 옵션
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 메시지 값 직렬화 옵션
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

코드 상에서는 가장 먼저 Properties를 생성한다.
필수 옵션인 서버, 메시지 키 직렬화 옵션, 메시지 값 직렬화 옵션을 설정한다.

3) KafkaProducer

        /**
         * 3. 카프카 프로듀서 인스턴스 생성
         *  미리 정의한 property를 바탕으로
         *  지정한 서버와 통신하여
         *  지정한 타입으로 메시지 키와 값을 직렬화하는
         *  카프카 프로듀서 인스턴스 생성
         **/
        // KafkaProducer<키 직렬화 타입, 값 직렬화 타입> 인스턴스명 = new KafkaProducer<>(프로퍼티 변수);
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

다음으로는 properties 변수를 파라미터로 넣어준 카프카프로듀서 인스턴스를 생성해준다.

4) ProducerRecord

        String msgVal = "testByApplicationMessage";

        /**
         * 4. 프로듀서 레코드 인스턴스 생성
         *  필수값으로 토픽명과 메시지값 필요
         */
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, msgVal);

이제 프로듀서레코드를 정의할 차례이다.
토픽명과 메시지값을 가지는 프로듀서레코드 인스턴스를 생성해둔다.

5) send()

        /**
         * 5. 파라미터로 프로듀서레코드 전달하여 KafkaProducer의 send() 메소드 호출
         *  내부적으로 Partitioner와 Accumulator (배치로 모음) 동작
         *  -> 카프카 프로듀서에 의해 정의한 레코드 전송
         */
        producer.send(record);
        logger.info("{}", record);

정의해둔 레코드를 카프카프로듀서의 send() 메소드를 이용해 전송한다.

6) flush(), close()

        /**
         * flush() : 데이터 전송 강제하고 싶을 때 사용
         *  Accumulator에 있는 모든 데이터를 날린 후 데이터 전송을 강제화 함
         */
        producer.flush();
        /**
         * 6. 프로듀서 종료
         */
        producer.close();

Accumulator에 있는 모든 데이터를 날린 후 데이터 전송을 강제화하고 싶을 때 flush() 메소드를 사용할 수도 있다.

프로듀스를 완료한 후에는 close() 메소드로 안전하게 프로듀서를 종료하고 리소스를 해제한다.
이 메소드를 사용하면 Accumulator에 저장되어 있는 모든 데이터를 카프카 클러스터로 전송해준다.

안전한 프로듀싱을 위해서는 flush()와 close()를 함께 사용해주는 것이 좋다.

3. 결과 확인


실행되면 다음과 같이 카프카 설정을 로그로 확인할 수 있다.

로그로 찍어준 레코드 정보도 확인할 수 있다.

이제 실제로 해당 토픽이 만들어졌고, 메시지도 잘 들어갔음을 확인해볼 수 있다.

0개의 댓글