Kafka Producer Application 구현하기 1

망7H·2021년 4월 27일
1

이번 포스팅에서는 기본적인 형태의 프로듀서 애플리케이션과 조금씩의 변형을 가미한 프로듀서 애플리케이션을 직접 작성해보려 한다.

0. 작업 환경 및 서버 정보

NameMain Info기타
Local OSmacOS Big Sur-
JDKVer: 1.8.0_282AWS EC2 인스턴스 A
KafkaIP: 13.209.97.233
PORT: 9092
AWS EC2 인스턴스 A
ZookeeperIP: 13.209.97.233
PORT: 2181
AWS EC2 인스턴스 A
GitGithub작업 관련 소스

1. Basic Producer Application

여기서는 가장 기본적인 프로듀서의 기능만을 하는 프로듀서 애플리케이션을 직접 작성해볼 것이다.

1) EC2에 Kafka를 설치하기

이전에 작성한 포스팅인 AWS EC2에 Kafka 설치하기를 참고하여 EC2에 카프카 설치를 마치도록 한다.

2) EC2 인스턴스 실행

테스트의 편의를 위해 EC2 인스턴스 실행 후 발급된 ip를 로컬 환경의 /etc/hosts에 추가한다.

※ 인스턴스를 새로 실행했다면 브로커의 설정을 정의하고 있는 server.propertiesadvertised.listeners에 발급 받은 인스턴스의 ip로 다시 설정해주는 것을 잊지말자.

3) 주키퍼 & 카프카(브로커) 실행

주키퍼의 실행 명령은 아래와 같다.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

jps -vm

위와 같이 실행되었다면 정상이다.
브로커에 대한 메타 정보를 가진 주키퍼가 실행되었으니, 이제 브로커를 실행시켜보자.

bin/kafka-server-start.sh -daemon config/server.properties

jps -m

위와 같이 실행된 프로세스가 확인된다면 정상이다.

4) 토픽 (basic_topic) 생성

우리가 만들 프로듀서 애플리케이션에서 생산(produce)할 데이터(record)를 저장할 토픽을 생성해보도록 하자.
Local 환경에서 kafka-client 라이브러리를 사용하여 EC2에 토픽을 생성하는 명령을 실행해보자.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--create \
--topic basic_test \
--partitions 3

위와 같이 토픽 생성 메시지가 나왔다면 정상이다.
혹시 모르니, 토픽이 정상적으로 생성되었는지 아래의 명령으로 확인해보자.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--list

5) 프로듀서 애플리케이션 프로젝트 생성

이제 IntelliJ를 사용하여 gradle 프로젝트를 생성하자.
프로젝트 명은 kafka-producer-application으로 하였다.

6) build.gradle에 라이브러리 추가

아래와 같이 라이브러리 추가하기.

-- build.gradle 내부 소스
dependencies {
  ...
  compile 'org.apache.kafka:kafka-clients:2.5.0'
  compile 'org.slf4j:slf4j-simple:1.7.30'
}

gradle reload를 통해 /kafka-producer-application/External Libraries 경로에 라이브러리가 추가되어 있으면 정상이다.

7) 패키지 생성

src/main/java 아래에 com/producer 패키지를 추가한다.

8) BasicProducer.class 추가하고 실행

src/main/java/com/producer/BasicProducer.class 를 추가하도록 하자.
이제 진짜 최소한의 프로듀서 기능을 하는 프로듀서 애플리케이션을 만들어 보자.
소스 코드는 아래와 같다.

package com.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class BasicProducer {
    private final static Logger logger = LoggerFactory.getLogger(BasicProducer.class);

    /**
     * 프로듀서 애플리케이션에서는 데이터를 전달할 브로커의 호스트 ip와 토픽명을 알고 있어야 한다.
     * [참고] 브로커에 해당 토픽명이 없는 경우에는 기본 설정에 따라서는 토픽을 생성하고 데이터를 넣어준다.
     */
    private final static String BOOTSTRAP_SERVER = "my-kafka:9092";
    private final static String TOPIC_NAME = "basic_topic";


    public static void main(String[] args) {
        /**
         * 프로듀서의 인스턴스에 사용할 '필수 옵션'을 설정한다.
         * [참고] 선언하지 않은 '선택 옵션'은 기본 옵션값으로 설정되어 동작한다.
         */
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);

        /**
         * 메시지 키, 값을 직렬화 하기 위해 StringSerializer를 사용한다.
         * StringSerializer는 String을 직렬화하는 카프카의 라이브러리이다.
         * (org.apache.kafka.common.serialization)
         */
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 프로듀서 인스턴스를 생성하며, 위에서 설정한 설정을 파라미터로 사용한다.
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        /**
         * 전달할 메시지 값을 생성한다.
         * (여기서는 애플리케이션 실행 시점의 날짜와 시간을 조합하여서 메시지 값으로 생성한다.)
         */
        Date todayDate = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String messageValue = "m1 [" + dateFormat.format(todayDate) + "]";

        /**
         * 레코드를 생성하고 전달한다.
         * 이때, 레코드를 전달할 토픽과 레코드의 메시지 값을 지정한다.
         */
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);

        logger.info("{}", record);

        /**
         * 애플리케이션을 안전하게 종료한다.
         */
        producer.flush();
        producer.close();
    }
}

주석이 잘 달려 있기 때문에, 별도의 설명은 생략한다.

9) 결과 확인

아래와 같이 basic_topic에 대해 consume 명령을 실행한다.

bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic basic_topic \
--from-beginning

위와 같이 메시지 하나가 출력되었다면 정상이다.

2. 메시지 키를 사용하는 Producer Application

처음에 작성한 프로듀서는 단순히 메시지 값 만을 전달한다.
이제, 메시지 키를 사용하는 프로듀서 애플리케이션을 작성해보자.

1) 토픽 (key_topic) 생성

이번에는 토픽명을 key_topic으로 생성하여 보자.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--create \
--topic key_topic \
--partitions 3

혹시 모르니, 토픽이 정상적으로 생성되었는지 확인하고 싶다면 아래의 명령을 사용하라.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--list

2) KeyProducer.class 추가하고 실행

src/main/java/com/producer/KeyProducer.class 를 추가하도록 하자.
메시지의 키가 추가로 전달되도록 할 것이므로,
BasicProducer.class의 소스를 복사해서 데이터를 저장할 토픽명을 key_topic으로 변경하고, 메시지 키를 jwpark06으로 추가한다.
소스 코드는 아래와 같다.

package com.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class KeyProducer {
    private final static Logger logger = LoggerFactory.getLogger(BasicProducer.class);

    /**
     * 프로듀서 애플리케이션에서는 데이터를 전달할 브로커의 호스트 ip와 토픽명을 알고 있어야 한다.
     * [참고] 브로커에 해당 토픽명이 없는 경우에는 기본 설정에 따라서는 토픽을 생성하고 데이터를 넣어준다.
     */
    private final static String BOOTSTRAP_SERVER = "my-kafka:9092";
    private final static String TOPIC_NAME = "key_topic";


    public static void main(String[] args) {
        /**
         * 프로듀서의 인스턴스에 사용할 '필수 옵션'을 설정한다.
         * [참고] 선언하지 않은 '선택 옵션'은 기본 옵션값으로 설정되어 동작한다.
         */
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);

        /**
         * 메시지 키, 값을 직렬화 하기 위해 StringSerializer를 사용한다.
         * StringSerializer는 String을 직렬화하는 카프카의 라이브러리이다.
         * (org.apache.kafka.common.serialization)
         */
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * 프로듀서 인스턴스를 생성하며, 위에서 설정한 설정을 파라미터로 사용한다.
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        /**
         * 전달할 메시지 값을 생성한다.
         * (여기서는 애플리케이션 실행 시점의 날짜와 시간을 조합하여서 메시지 값으로 생성한다.)
         */
        Date todayDate = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String messageValue = "m1 [" + dateFormat.format(todayDate) + "]";

        /**
         * 레코드를 생성하고 전달한다.
         * 이때, 레코드를 전달할 토픽과 레코드의 메시지 키와 값을 지정한다.
         */
        String messageKey = "jwpark06";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue);
        producer.send(record);

        logger.info("{}", record);

        /**
         * 애플리케이션을 안전하게 종료한다.
         */
        producer.flush();
        producer.close();
    }
}

3) 결과 확인

bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic key_topic \
--property print.key=true \
--property key.separator="-" \
--from-beginning



3. 커스텀 파티셔너를 사용하는 Producer Application

프로듀서 애플리케이션은 메시지를 브로커로 전달할 때, 직접 토픽과 파티션 번호를 지정할 수도 있지만
시스템 상에서 기본적으로 메시지를 파티션들에게 분배해주는 파티셔너가 존재한다.
파티셔너(Partitioner)는 프로듀서 애플리케이션 내에서 생성된 메시지를 브로커로 전달할 때, 이 메시지가 토픽의 어떤 파티션에 전달될 지를 정하는 역할을 한다.
포스팅5. Producer & Partitioner를 참고하면 RoundRobinPartitionerUniformStickyPartitioner에 대해 이해할 수 있을 것이다.

여기서는 카프카 버전별 기본 파티셔너로 제공되는 것 이외에 사용자가 직접 기준을 설계하여 메시지가 들어갈 파티션이 정해지도록 커스텀 파티셔너(CustomPartitioner)를 직접 만들고, 커스텀 파티셔너를 사용하는 프로듀서 애플리케이션을 만들어 볼 것이다.

jwpark06 이라는 메시지 키를 가진 메시지는 파티션 번호가 0번인 파티션에 전달되고,
wychoi01 이라는 메시지 키를 가진 메시지는 파티션 번호가 1번인 파티션에 전달되고,
smlee02 이라는 메시지 키를 가진 메시지는 파티션 번호가 2번인 파티션에 전달되고,
이 밖의 다른 메시지 키인 경우에는 해시값을 지정하여 특정 파티션에 매칭되도록 하고,
메시지 키가 없는 메시지는 비정상적인 데이터로 간주하고 파티션에 전달되지 않도록 해보자.

1) 토픽 (custom_topic) 생성

이번에는 토픽명을 custom_topic으로 생성하여 보자.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--create \
--topic custom_topic \
--partitions 3

혹시 모르니, 토픽이 정상적으로 생성되었는지 확인하고 싶다면 아래의 명령을 사용하라.

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--list

2) 파티셔너 패키지 생성

src/main/java/com/partitioner

3) CustomPartitioner.class 추가

src/main/java/com/partitioner/CustomPartitioner.class 를 추가하도록 하자.
소스 코드는 아래와 같다.

package com.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    /**
     * partition 메서드에는 레코드를 기반으로 파티션을 정하는 로직을 포함한다.
     * 리턴값은 주어진 레코드가 들어갈 파티션 번호이다.
     *
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /**
         * 레코드에 메시지 키를 지정하지 않은 경우 -> 비정상적인 데이터로 간주하고 InvalidRecordException 발생 처리.
         */
        if (keyBytes == null) {
            throw new InvalidRecordException("Need message key");
        }

        /**
         * 메시지 키에 따라 레코드가 전달될 파티션을 지정한다.
         * jwpark06 -> partition 0
         * wychoi01 -> partition 1
         * smlee02 -> partition 2
         */
        String keyName = (String) key;
        if (keyName.equals("jwpark06")) {
            return 0;
        } else if (keyName.equals("wychoi01")) {
            return 1;
        } else if (keyName.equals("smlee02")) {
            return 2;
        }

        /**
         * 메시지 키가 존재하지만 "jwpark06", "wychoi01", "smlee02"가 아닌 경우
         * 해시값을 지정하여 특정 파티션에 매칭되도록 한다.
         */
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

위와 같이 커스텀 파티셔너를 만들었다면, 이 커스텀 파티셔너를 활용하는 프로듀서 애플리케이션을 만들어보자.

4) KeyProducerUseCustomPartitioner.class 추가하고 실행

src/main/java/com/producer/KeyProducerUserCusomPartitioner.class 를 추가하도록 하자.
기본적인 소스는 KeyProducer.class의 것과 비슷하므로 소스를 복사해오자.
custom_topic으로 변경하고, 메시지 키만 각각 설정해주는 객체를 생성해서 적용하였다.

package com.producer;

import com.partitioner.CustomPartitioner;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KeyProducerUseCustomPartitioner {
    private final static Logger logger = LoggerFactory.getLogger(BasicProducer.class);

    /**
     * 프로듀서 애플리케이션에서는 데이터를 전달할 브로커의 호스트 ip와 토픽명을 알고 있어야 한다.
     * [참고] 브로커에 해당 토픽명이 없는 경우에는 기본 설정에 따라서는 토픽을 생성하고 데이터를 넣어준다.
     */
    private final static String BOOTSTRAP_SERVER = "my-kafka:9092";
    private final static String TOPIC_NAME = "custom_topic";


    public static void main(String[] args) {
        /**
         * 프로듀서의 인스턴스에 사용할 '필수 옵션'을 설정한다.
         * [참고] 선언하지 않은 '선택 옵션'은 기본 옵션값으로 설정되어 동작한다.
         */
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);

        /**
         * 메시지 키, 값을 직렬화 하기 위해 StringSerializer를 사용한다.
         * StringSerializer는 String을 직렬화하는 카프카의 라이브러리이다.
         * (org.apache.kafka.common.serialization)
         *
         * CustomPartitioner는 프로듀서 애플리케이션이 메시지를 전달할 때, 파티션을 고르는데 사용할 파티셔너이다.
         */
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

        /**
         * 프로듀서 인스턴스를 생성하며, 위에서 설정한 설정을 파라미터로 사용한다.
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        /**
         * 전달할 메시지 값을 생성한다.
         * (여기서는 애플리케이션 실행 시점의 날짜와 시간을 조합하여서 메시지 값으로 생성한다.)
         */
        Date todayDate = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String messageValue = "[" + dateFormat.format(todayDate) + "]";

        /**
         * 레코드들를 생성하고 전달한다.
         * 이때, 레코드를 전달할 토픽과 레코드의 메시지 키와 값을 지정한다.
         */
        Map<String, String> records = new HashMap<>();
        records.put("jwpark06", "m1 " + messageValue);
        records.put("smlee02", "m2 " + messageValue);
        records.put("wychoi01", "m3 " + messageValue);
        records.put("abcd", "m4 " + messageValue);
        records.put(null, "m5 " + messageValue);

        ProducerRecord<String, String> record;

        for (String messageKey : records.keySet()) {
            record = new ProducerRecord<>(TOPIC_NAME, messageKey, records.get(messageKey));
            producer.send(record);
            logger.info("{}", record);
        }

        /**
         * 애플리케이션을 안전하게 종료한다.
         */
        producer.flush();
        producer.close();
    }
}

사실 위의 소스 코드에서는 Map을 사용했기 때문이 그리 좋은 소스 코드가 아니다.
Java의 Collection 인터페이스에서 Map은 키와 값으로 이루어져있지만, 키의 중복을 허용하지 않기 때문에 records를 아래와 같이 동일 key로 메시지를 쌓으면 문제가 발생한다.

Map<String, String> records = new HashMap<>();
records.put("jwpark06", "m1 " + messageValue);
records.put("smlee02", "m2 " + messageValue);
records.put("wychoi01", "m3 " + messageValue);
records.put("abcd", "m4 " + messageValue);
records.put("jwpark06", "m5 " + messageValue);
records.put("smlee02", "m6 " + messageValue);
records.put("wychoi01", "m7 " + messageValue);
records.put("abcd", "m8 " + messageValue);
records.put(null, "m9 " + messageValue);

위와 같이 데이터를 넣게 되면 m1 ~ m4에 해당하는 데이터는 키 중복에 의해 Map 타입의 records에 존재하지 않게 된다.

5) 결과 확인

먼저, 전달한 5개의 데이터가 custom_topic에 잘 전달되었는지 확인해보았다.
위 결과에 따르면 전달된 레코드는 4개임을 확인할 수 있다.
커스텀 파티셔너에서 메시지 키가 없는 경우에는 예외처리하였기 때문에,
토픽에 데이터가 없는 것이 정상이므로 총 4개의 레코드만 저장되어 있으면 정상이다.

그럼 이제 파티션 0에 jwpark06을 메시지 키로 갖는 데이터가 있는지,
파티션 1에는 wychoi01을 메시지 키로 갖는 데이터가 있는지,
파티션 2에는 smlee02를 메시지 키로 갖는 데이터가 있는지 확인한다.
그리고, 위 3개의 키가 아닌 경우에는 어디느 파티션으로 들어갔는지 확인한다.
위 결과를 보면 특정 메시지 키(jwpark06, wychoi01, smlee02)를 가지는 경우에는 지정했던 파티션에 맞게 데이터가 들어갔음을 확인할 수 있다.

이때 지정했던 3개의 메시지 키가 아닌 abcd 라는 메시지 키는 커스텀 파티셔너의 해시값 처리에 따라 지정되도록 처리한 결과 파티션 2가 지정되었다.
이 상황에서 파티션의 수가 변하지 않는다면 이 프로듀서 애플리케이션을 사용하여 메시지 키가 abcd인 레코드를 전달할 경우 동일한 파티션 2에만 지속적으로 들어갈 것이다.

하지만 파티션의 수가 변하게 되면 파티션의 수가 변한 이후의 해시값 연산 결과가 달라지므로 abcd라는 메시지 키를 갖더라도 동일한 파티션 2에 들어간다고 보장할 수 없다.



오랜만에 직접 프로듀서 애플리케이션을 만드는 작업을 하였다.
가능하다면 다음 포스팅에서는 전달한 메시지의 정상 전달을 확인하기 위해 기다리는 동기 프로듀서와 정상 전달이 오면 비동기 적으로 알려주는 비동기 프로듀서를 작성해보도록 해보도록 한다.

해당 글 작성에 참고한 링크

[도서] 아파치 카프카 애플리케이션 프로그래밍 with 자바

profile
망한 개발자의 개발 기록입니다. 저를 타산지석으로 삼으시고 공부하세요.

0개의 댓글