Kafka 프로듀서 애플리케이션 만들기 (w. Spring boot)

Kai·2024년 5월 31일
0

Kafka

목록 보기
5/6

✍️ 이 글에서 사용한 코드: 깃헙

☕ 개요


이번 글에선는 Kafka 프로듀서에 대해서 알아보고, Java와 Spring으로 Kafka 프로듀서 애플리케이션을 만들어보도록 하겠다.

바로 ㄱㄱ 🔥

이 글은 Dev원영님의 강의를 듣고 정리한 내용이다. 🙏



🧐 프로듀서의 구조


Java 프로듀서 애플리케이션은 내부적으로 이런 구조를 갖고 있다.
하나씩 좀 더 자세히 알아보자.

1) ProductRecord

말 그대로 Kafka에 전송할 "레코드"를 의미한다. .send() 메서드를 통해서 Kafka에 전송 요청을할 수 있다.

2) Partitioner

Kafka에 존재하는 파티션 중, 어떤 파티션에 전송할지 지정한다.
기본적으로 제공되는 파티셔너 클래스는 아래 2가지가 있고, 커스텀 파티셔너가 필요하다면, Partitioner 인터페이스의 구현체를 만들어서 사용하면 된다.

  1. UniformStickyPartitioner
  2. RoundRobinPartitioner

✍️ 파티셔너의 동작: 메세지 키가 있는 경우

위 2개의 파티셔너 모두, 메세지 키가 있는 경우는 동일하게 동작한다. 메세지 키의 해시값과 Kafka에 생성되어 있는 파티션을 매칭해서 레코드를 분류한다.
즉, 동일한 키값을 갖고 있다면 무조건 동일한 파티션에 할당되게 된다.
또, 파티션의 개수가 변경되게 되면

✍️ 파티셔너의 동작: 메세지 키가 없는 경우

메세지 키가 없는 경우엔, 파티션들에 골고루 레코드를 분배해야한다. 위 2개의 파티셔너 모두 동일한 작업을 하지만, UniformStickyPartitionerRoundRobinPartitioner의 단점을 개선한 클래스이다.

RoundRobinPartitioner는 레코드가 들어오는 대로 파티션을 순회하면서 전송한다. 이렇게 되면, Accumulator에서 배치로 묶이는 정도가 적어서 전송 성능이 비교적 떨어지게 된다.
반면에 UniformStickyPartitioner는 Accumulator에서 레코드들이 배치로 묶일 때까지 기다렸다가 전송하도록 해서 비교적 높은 전송 성능을 보장한다.

3) Accumulator

파티셔너를 거쳐서 넘어온 레코드들을 토픽별로 데이터를 모으는 역할을 한다.

4) Sender

Accumulator에 모여진 데이터를 정해진 기준이 되면, Kafka에 전송한다.



📒 프로듀서의 주요 옵션


1) 필수 옵션

  1. bootstrap.servers: 프로듀서가 데이터를 전송할 대상인 브로커를 정의한다. Kafka브로커의 호스트 이름:포트를 입력하고, 2개 이상의 브로커 정보를 입력하는 것이 안전하다.
  2. key.serializer: 레코드의 메세지 키를 직렬화하는 클래스를 지정한다.
  3. value.serializer: 레코드의 메세지 값을 직렬화하는 클래를 지정한다.

2) 선택 옵션

  1. acks: 프로듀서가 전송한 데이터가 브로커에 잘 전송됐는지 확인하는 데에 사용하는 옵션이다. 0, 1, -1(또는 'all') 중에 하나로 설정할 수 있다. Default: 1
  2. linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간이다. Default: 0
  3. retries: 브로커로부터 에러를 받고 난 후, 재전송을 시도하는 횟수이다. Default. 2147483647
  4. max.in.flight.requestes.per.connection: 데이터 전송 시, 브로커와 맺을 최대 커넥션 개수이다. 이 값 만큼 병렬적으로 전송이 가능하다. Default. 5
  5. partitioner.class: 파티셔너 클래스를 지정한다. Default. DefaultPartitioner
  6. enable.idempotence: 멱등성 프로듀서로 동작시킬지 결정한다. Default. false (3버전 이상부터는 true가 기본값)
  7. transactionl.id: 레코드를 전송할 때, 레코드를 트랜잭션 단위로 묶을지 결정한다. Default. null

3) acks

위에서 이야기한 acks 옵션에 대해서 좀 더 자세히 알아보자.

✍️ acks=0

프로듀서가 리더 파티션으로 데이터를 전송만 하고, 리더 파티션에 데이터가 저장됐는지 확인하지 않는 옵션이다. 그래서 당연하게도 데이터의 전송 속도는 가장 빠르다.
요약하자면, 데이터의 일부 유실을 감수하고라도 빠른 전송 속도가 중요하다면 사용할 수 있는 옵션이다. (Ex. GPS)

✍️ acks=1

프로듀서가 보낸 데이터가 리더 파티션에 정상 적재되었는지 확인하는 옵션이다. 팔로워 파티션에 복제되었는지는 확인하지 않으므로, 복제가 이루어지는 중에 리더 파티션에서 장애가 발생하면 데이터 유실이 발생할 수 있다.

✍️ acks=-1

프로듀서가 보낸 데이터가 리더 파티션에 정상 적재되었고, 팔로워 파티션들에도 정상적으로 복제되었는지 까지 확인하는 옵션이다. 가장 안정적인 방식이지만, 성능은 당연히 떨어지게 된다.
acks를 -1로 설정한 경우 min.insync.replicas옵션에 따라서 데이터의 안정성이 달라진다.
min.insync.replicas는 리더 파티션을 포함해서 몇 개의 파티션에 데이터가 적재된 것을 확인할지 결정하는 옵션이다.



💻 프로듀서 만들기


1) build.gradle

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.kafka:spring-kafka'
//	implementation 'org.springframework.kafka:kafka-clients:3.7.0'

	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'

	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
	testCompileOnly 'org.projectlombok:lombok'
	testAnnotationProcessor 'org.projectlombok:lombok'
}

위와 같이 build.gradle을 작성해주었다.
프로듀서를 만들기 위해서 필수적으로 필요한 라이브러리는 spring-kafka 또는 kafka-clients인데, 나는 spring-kafka를 사용하도록 하겠다.

2) KafkaProducerConfig

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(kafkaProducerFactory());
    }

}

위와 같이 설정 클래스를 하나 만들어준다. KafkaTemplate이 실질적으로 메세지 전송 요청을 담당할 클래스이다.

3) SimpleProducer

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@RequiredArgsConstructor
@Component
public class SimpleProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ProducerRecord record = new ProducerRecord(topic, message);
        kafkaTemplate.send(record);
    }

    public void sendMessage(String topic, String key, String message) {
        ProducerRecord record = new ProducerRecord(topic, key, message);
        kafkaTemplate.send(record);
    }

    public void sendMessage(String topic, int partition, String key, String message) {
        ProducerRecord record = new ProducerRecord(topic, partition, key, message);
        kafkaTemplate.send(record);
    }

}

간단하게 메세지를 전송할 수 있는 컴포넌트를 하나 만들어주었다.

4) 테스트 코드

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class SimpleProducerTest {

    @Autowired
    private SimpleProducer simpleProducer;

    @Test
    @DisplayName("메세지_전송")
    void 메세지_전송() throws Exception {
        simpleProducer.sendMessage("test", "hello world");
        simpleProducer.sendMessage("test", "key1", "hello world");
        simpleProducer.sendMessage("test", 0, "key1", "hello world");
    }

}

테스트 코드를 위와 같이 작성하고, 메세지가 잘 전송되는지 확인해보자.
위에서 부터 "키가 없는 메세지", "키가 있는 메세지", "키와 파티션을 지정한 메세지"에 대한 메세지 전송 코드이다.

5) 동작 확인

위 코드를 실행하고, 메세지가 Kafka로 잘 전송됐는지 확인해보니, test 토픽 아래에 hello world라는 메세지가 잘 전송된 것을 확인할 수 있다.



🧐 커스텀 파티셔너


1) 커스텀 파티셔너 클래스 만들기

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            throw new InvalidRecordException("메세지 키를 입력해주세요.");
        }

        // 키가 test인 경우 0번 파티션으로 보내기
        if (key.toString().equals("test")) {
            return 0;
        }

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int partitionSize = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionSize;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map) {}
}

위 파티셔너는 기본 동작은 동일하지만, test라는 키 값을 갖고 있는 메세지가 들어올 경우 무조건 0번 파티션으로 보내도록 되어 있다.

2) 설정 추가

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

KafkaProducerConfig 에 위 설정을 추가해준다. 동작은 굳이 확인해보진 않겠다.



🙏 참고


0개의 댓글