Kafka Producer With Java

김파란·2024년 12월 10일

Kafka

목록 보기
6/8

기초

public class ProducerDemo {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());

    public static void main(String[] args) {
        log.info("hello world");

        // 프로듀서 환경설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 프로듀서 레코드 생성
        // 토픽, 키, 밸류, 파티션 등을 설정할 수 있다
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_java", "hello world");

        // 데이터 보내기 (비동기)
        producer.send(producerRecord);

        // 프로듀서에게 모든 데이터를 보내고 완료될 때까지 기다리기 (동기)
        producer.flush();
        // 프로듀서 종료, close호출 시에는 close 전에 flush가 호출되지만 필요할 경우 flush 작성
        producer.close();
    }
}

콜백

  • 프로듀서가 어느 파티션의 어느 오프셋으로 전송되지는지 알 수 있다
  • 데이터를 보내고 성공적으로 반환받는지, 에러를 반환받는지 알 수 있다
public class ProducerDemoWithCallback {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithCallback.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Producer");

        // 프로듀서 환경설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 프로듀서 레코드 생성
        // 토픽, 키, 밸류, 파티션 등을 설정할 수 있다
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_java", "hello world");

        // 데이터 보내기 (비동기)
        // 추가
        producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // 성공적으로 보냈거나 예외가 발생했을 때 실행된다
                if (e == null) {
                    // 성공적으로 데이터를 보냄
                    log.info("Received new metadata \n" +
                            "Topic: " + recordMetadata.topic() + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Offset: " + recordMetadata.offset() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp() + "\n"
                            );
                } else{
                    log.error("Error while producing", e);
                }

            }
        });

        // 프로듀서에게 모든 데이터를 보내고 완료될 때까지 기다리기 (동기)
        producer.flush();
        // 프로듀서 종료, close호출 시에는 close 전에 flush가 호출되지만 필요할 경우 flush 작성
        producer.close();
    }
}

프로듀서 동작 방식

  • 라운드로빈 방식
    • 데이터가 하나씩 하나의 파티션으로 번갈아가면서 동작한다
  • Sticky Prititioner
    • 빠르게 데이터가 들어오면 배치처리를 한다
    • 일괄처리로 하나의 파티션에 데이터가 들어가도록 한다
    • 성능면에서 라운드로빈 방식에 비해서 낫고 Default이다
	// 배치사이즈 설정
    // 배치사이즈는 줄이면 안된다 기본 설정인 16KB 그대로 써야 한다
    properties.setProperty("batch.size", "400");
    // 파티셔너 설정
    properties.setProperty("partitioner.class", RoundRobinPartitioner.class.getName());

프로듀서 키

  • 키를 지정하면 같은 파티션으로 간다
  • 동일한 키는 동일한 파티션
public class ProducerDemoWithKeys {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemoWithKeys.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Producer");

        // 프로듀서 환경설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int j = 0; j < 2; j++) {
            for (int i = 0; i < 10; i++) {
                String topic = "demo_java";
                String key = "id_" + i;
                String value = "hello world " + i;

                // 프로듀서 레코드 생성
                // 토픽, 키, 밸류, 파티션 등을 설정할 수 있다
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value);

                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        // 성공적으로 보냈거나 예외가 발생했을 때 실행된다
                        if (e == null) {
                            // 성공적으로 데이터를 보냄
                            log.info("Key: " + key + " | Partition: " + recordMetadata.partition()
                            );
                        } else {
                            log.error("Error while producing", e);
                        }
                    }
                });
            }
        }



        // 프로듀서에게 모든 데이터를 보내고 완료될 때까지 기다리기 (동기)
        producer.flush();
        // 프로듀서 종료, close호출 시에는 close 전에 flush가 호출되지만 필요할 경우 flush 작성
        producer.close();
    }
}

확인 응답(Acknowledgements)

  • 프로듀서가 브로커에게 데이터를 보내면 데이터 쓰기 작업이 완료됐다는 확인 응답을 받을 수 있다
  • acks=0: 확인 응답을 기다리지 않겠다는 뜻이다
    • 따라서 데이터 손실 가능성이 있다. 확인 요청조차 하지 않는다
  • acks=1: 리더 브로커의 확인을 기다리겠다는 뜻이다
    • 데이터 손실 가능성이 조금 줄어든다
  • acks=all: 리더와 모든 레플리카한테 확인을 받는다
    • 데이터 손실이 발생하지 않는다. acks=-1과 같은 의미이다

Acks=0

  • 프로듀서는 메시지는 보내는 순간 메시지 쓰기에 성공했다고 간주한다
  • 브로커가 오프라인이 되거나 예외가 발생하더라도 우리는 알 수 없고 데이터 손실로 이어진다
  • 메트릭 정보처럼 메시지를 유실해도 괜찮을 때 유용하다
  • 네트워크 오버헤드가 최소화되기 때문에 성능상 제일 우월하다

Acks=1

  • 리더 브로커에게 확인 응답을 받을 때 메시지 쓰기가 성공했다고 간주한다
  • 2.8까지 기본 값이었으나 현재는 Acks=all로 바뀌었다
  • 데이터 확인을 받은 건 리더뿐이라 데이터가 복제되었는지 여부는 알 수 없다
  • 리더가 오프라인이 됐을 때 레플리카들이 미처 데이터를 복제해두지 못했다면 데이터 손실이 발생한다

Acks=all

  • 요즘은 데이터 안전을 최대한 보장하려는 추세이다
  • 동기화된 레플리카(ISR)들에게 모두 확인을 받으면 메시지 쓰기에 성공했다고 간주한다

min.insync.replicas

  • acks=all과 같이 쓰는 설정이 있다 min.insync.replicas이다
  • acks=all을 할 때 리더가 현재 클러스터 내의 동기화된 레플리카 개수만으로 안전한 쓰기가 가능한지 확인한다
  • min.insync.replicas=1가 기본값인데 이 경우 리더 브로커만 확인 응답을 보내면 된다
  • min.insync.replicas=2은 리더 브로커와 적어도 1개이상의 레플리카가 확인 응답을 보내야한다
  • 충족하지 못하게 되면 예외를 발생시킨다
  • 데이터 안전을 최대로 하려면 복제 계수는 3으로 min.insync.replicas은 2로 설정해야 한다

재시도

  • 예외가 발생했는데 처리하지 않고 재시도 하는 방법이 있다
  • 카프카 2.0에서는 기본값이 0이고 2.1 이상부터는 2^31승이다
  • retry.backoff.ms로 다음 재시도까지 대기 시간을 설정할 수 있다
  • 재시도에도 타임아웃이 걸린다 delivery.timeout.ms=120000으로 즉 2분이다
  • 메시지 배달이 대기시간과 Retry로 시간을 넘지만 않으면 성공적으로 한것으로 본다
  • 재시도를 하게 되면 메시지를 반복적으로 보내게 되어 멱등적인 프로듀서로 메시지를 보내야 한다
  • 그래서 키 기준 정렬 방식을 쓰면 문제가 될 수 있다
    • 이럴 때 사용하는것이 max.in.flight.requests.per.connection으로 기본값은 5이고 옛날 버전이고 재시도 값이 있으면 1로 설정해야 키 기준 정렬 방식을 사용할 수 있다
    • 최신 카프카는 멱등 프로듀서를 사용하면 된다

멱등 프로듀서

  • 네트워크 오류로 인해 중복된 메시지가 전송될 수 있다
  • 예를 들어 프로듀서가 메시지를 보내고 카프카가 응답확인을 보냈는데 네트워크 오류로 인해 응답확인을 보냈지만 프로듀서에게 전달되지 못한다
    • 그러면 프로듀서는 메시지 전달을 못받은 걸로 간주하고 다시 보내게 된다
    • 이를 카프카는 새로운 요청으로 여기고 중복된 데이터를 커밋하게 된다
    • 이를 해결하기 위해 멱등 프로듀서를 사용한다

멱등 프로듀서

  • 네트워크 오류에도 중복을 허용하지 않는 멱등 프로듀서가 있다
  • 오류로 같은 메시지를 보내더라도 카프카가 중복 생성 요청임을 알아채고 두 번 커밋하지 않는다
  • 3.0에서 이상에서는 디폴트로 사용한다
  • 사용하려면 enable.idempotence=true로 설정하면 된다
	// 프로듀서 안전 (2.8버전 이하)
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));

메시지 압축

  • 보통은 프로듀서 데이터를 전송할 때 텍스트로 된 데이터를 전송한다
  • 압축을 하게 되면 메시지 크기가 줄어들어 전송 속도가 줄어들어 디스크 내 저장공간을 적게 차지한다
  • 압축은 프로듀서 레벨, 브로커 레벨, 토픽 레벨에서 압축을 할 수 있다
  • 압축 타입compression.type은 none, gzip, lz4, snappy, zstd 등 다양한 타입을 지원한다
  • 메시지 배치의 크기가 클수록 압축률이 올라가 효율이 증가한다
  • 먼저 snappy 또는 lz4를 사용하는 걸 권장한다 메시지가 텍스트라면 snappy가 좋다

압축 레벨

  • 브로커 레벨에서 진행되면 모든 토픽이 압축이 된다
  • 토픽 수준에서 활성화되면 해당 토픽에만 적용된다
  • compression.type=producer가 기본값으로 브로커는 프로듀서가 압축한 데이터를 재압축하지 않고 토픽의 로그 파일에 바로 저장한다
    • 최적의 방법이지만 프로듀서가 압축하는 책임을 가진다
  • compression.type=none 압축한 모든 데이터를 브로커가 압축을 해제한다
    • 매우 비효율적이다
  • 예를들어 compression.type=lz4 이렇게 설정하면 특정한 유형의 압축을 사용하도록 설정할 수 있다
    • 토픽에 설정된 압축 유형이 프로듀서 설정의 유형과 일치하면 데이터가 다시 압축되지 않는다
    • 압축 방법이 다르다면 브로커가 배치를 압축 해지하고 명시된 압축방법으로 다시 압축한다
    • 추가 CPU 사이클을 소비하므로 기본값 설정하는 것이 좋다
  • 프로듀서 설정을 건드릴 수 없어 압축을 활성화하고 싶은 경우 브로커에서 압축을 활성화해야 한다

linger.ms & batch.size

  • max.in.flight.requests.per.connection은 프로듀서와 브로커 사이에 최대 5개의 메시지 배치가 전송될 수 있다는 것을 의미한다 (병렬처리)
    • 메시지 배치를 처리할 수 없을 때는 메시지 배치를 모은다
  • 배칭 매커니즘에 영향을 주는 두 개의 설정이 있다
  • linger.ms: 기본값은 0으로 배치를 전송할 때까지 대기하는 시간이다
    • 5ms로 설정하면 메시지를 전송하기 5ms 동안 배치에 메시지를 추가한다
  • batch.size: batch.size가 다 차면 linger.ms이 되기 전에 배치를 전송한다
    • 기본값은 16KB로 그 이상으로 늘리게 되면 압축률, 처리량, 요청의 효율이 늘어난다
    • 배치크기보다 큰 메시지가 있는 경우 바로 전송한다
    • 배치는 전송하는 파티션마다 하나씩 할당되므로 너무 큰 값을 할당하게 되면 메모리가 낭비된다
// 처리량을 늘리기 위한 설정
  properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
  properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024));
  properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

Default Partitioner

0개의 댓글