Kafka 프로듀서

땡글이·2023년 4월 2일
0

Kafka 에 대하여

목록 보기
5/5

이전까지는 카프카의 특징들 혹은 그 특징들을 위해 설계된 디자인들에 대해 살펴봤습니다. 이제는 카프카 프로듀서가 어떻게 메시지를 생산해서 카프카 토픽으로 메시지를 보내는지 등을 알아보겠습니다.

자바를 활용한 메시지 보내기

자바 언어를 활용해서 카프카 서버로 메시지를 보내는 프로듀서를 구현해보고자 합니다. 근데 프로듀서에서 카프카 서버로 메시지를 보내는 방식은 총 3가지입니다. 하나하나씩 알아보겠습니다.

메시지를 보내고 확인하지 않기

이 방식은 프로듀서에서 메시지를 보내고 난 후에 성공적으로 도착했는지까지 확인하지는 않는 방법입니다. 해당방식으로 구현하게 되면 메시지 유실의 가능성이 있습니다. 만약, 프로듀서에서 메시지를 보냈는데 보낸 시점에는 정상이었다가 메시지를 저장하기 직전에 카프카 브로커에 장애가 발생했다면 보낸 메시지는 유실됩니다.

그렇기에, 일반적인 서비스 환경에서는 이렇게 사용하지 않습니다. 아래가 메시지를 보내고 확인하지 않는 카프카 프로듀서 코드입니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaBookProducer1 {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            producer.send(new ProducerRecord<>("book-topic", "Hello"));
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

동기 전송

자바로 구현된 카프카 프로듀서는 send() 함수로 메시지를 보낸 뒤, Future 객체를 리턴합니다. Future 객체에서 get() 함수를 이용해 send()가 성공했는지 실패했는지를 확인합니다. 이러한 방법을 통해 메시지마다 브로커에게 전송한 메시지가 성공했는지 실패했는지 확인하여 신뢰성 있는 전송을 할 수 있습니다. 아래는 해당 방식을 자바로 구현한 코드입니다.

public class KafkaBookProducer1 {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            RecordMetadata metadata = producer.send(new ProducerRecord<>("book-topic", "Hello")).get();
            
            System.out.printf("Partition : %d, Offset : %d", metadata.partition(), metadata.offset());
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

비동기 전송

위의 동기 전송 방식처럼 카프카 프로듀서가 모든 메시지에 대한 응답을 기다린다면 응답을 기다리는 시간이 많이 소모되고, 단일 스레드 환경에서는 다른 기능들이 동작하지 못하는 환경에 놓일 수 있습니다.

그렇기에, 비동기적으로 메시지를 전송한다면 더 빠른 메시지 전송이 가능해집니다. 카프카 서버로부터 메시지가 저장되었는지에 대한 응답이 오면 콜백 함수가 호출되도록 Callback 인터페이스를 구현해줍니다.

  • 만약, 카프카 서버에서 오류가 발생하면, onCompletion() 메서드는 exception 객체에 오류 내용이 저장되어 콜백함수가 실행됩니다.

아래는 비동기 전송 방식을 구현한 코드입니다.

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

public class BookCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (metadata != null) {
            System.out.printf("Partition : %d, Offset : %d", metadata.partition(), metadata.offset());
        } else {
            exception.printStackTrace();
        }
    }
}
public class KafkaBookProducer1 {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            producer.send(new ProducerRecord<>("book-topic", "Hello"), new BookCallback()).get();

        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

이렇게 3가지 전송 방식에 대해 알아봤는데, 전송 방식에 따라 메시지를 보내는 속도에 차이가 생길 수 있다는 것을 알 수 있습니다.

프로듀서 주요 옵션

카프카 프로듀서에는 많은 옵션들이 존재하고, 해당 옵션들을 잘 이해하고 사용하면 카프카를 100% 활용할 수 있을 것입니다. 카프카 프로듀서에는 어떤 옵션들이 있고 어떤 기능들을 하는지 알아보겠습니다.

bootstrap.servers

위의 구현된 코드에서도 알 수 있다싶이 bootstrap.servers 라는 옵션을 항상 만들었습니다. 해당 옵션에는 카프카 클러스터 내 모든 브로커를 입력해주었습니다.

카프카 클러스터는 클러스터 마스터라는 개념이 없기 때문에 클러스터 내 모든 서버가 클라이언트의 요청을 받을 수 있습니다. 만약 주어진 리스트의 서버 중 하나에서 장애가 발생할 경우 클라이언트는 자동으로 다른 서버로 재접속을 시도하기 때문에 사용자 프로그램에서 문제없이 사용할 수 있게 됩니다.

즉, 카프카 클러스터 내의 모든 브로커를 입력해줘야 카프카 프로듀서에서도 카프카의 고가용성 기능을 적극 활용할 수 있게 되는 것입니다.

acks

프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack(승인)의 수입니다. 쉽게 얘기하자면, 프로듀서에서 메시지를 보내고 난 후 받을 응답의 개수를 의미합니다. 프로듀서 옵션 중 아주 중요한 개념이니 옵션 값이 가지는 의미가 무엇이고 값에 따라 어떻게 동작하는지 확인해보겠습니다.

acks 옵션 : acks=0

acks=0 옵션의 의미는 카프카 서버로부터 응답을 받지 않습니다. 이렇게 설정한 경우, 서버가 데이터를 받았는지 보장하지 않고, 클라이언트는 전송 실패에 대한 결과를 알지 못하기 때문에 재요청 설정도 적용되지 않습니다. 이렇게 구현될 경우 매우 빠르게 메시지를 보낼 수 있어서 높은 처리량을 가지지만, 메시지 유실 가능성이 높다는 단점이 있어서 서비스 환경에서는 일반적으로 활용하지 않습니다.

acks 옵션 : acks=1

acks=0 옵션의 의미는 카프카 서버로부터 응답을 하나만 받습니다. 즉, 리더만 데이터를 기록하지만, 모든 팔로워는 확인하지 않습니다.

왜 리더로부터만 응답을 받는가? 팔로워한테 응답을 받을수도 있는거 아닌가?

팔로워는 리더 파티션을 계속 팔로우하면서 리더의 데이터를 복제해 저장합니다. 데이터를 읽고 데이터를 저장하는 작업은 리더 파티션이 담당하기에 카프카 클라이언트와 통신하는 파티션은 리더 파티션이 되기에 acks=1 옵션은 리더 파티션이 내는 응답을 기다린다는 것을 의미합니다.

acks 옵션 : acks=all 또는 -1

acks=all 또는 acks=-1 옵션의 의미는 리더를 포함해 모든 팔로워까지 데이터에 대한 ack를 기다리는 것을 의미합니다. 그렇기에 데이터 무손실에 대해 강력하게 보장하는 옵션 값입니다.

하지만 주의해야할 점이 있습니다. acks=all 옵션을 100% 활용하기 위해선, 프로듀서의 설정 뿐만이 아니라, 브로커의 설정도 같이 조정해줘야 합니다. 얘기하고자 하는 브로커의 옵션은 min.insync.replicas옵션입니다.

acks=all 과 min.insync.replicas 의 관계

만약 카프카 브로커에서의 옵션 값이 acks=all이고 min.insync.replicas=1라면, 프로듀서가 리더 파티션이 있는 카프카 서버로 메시지를 보내고, 카프카 브로커에서는 min.insync.replicas=1으로 되어있기에 리더 파티션에 메시지가 저장되면 팔로워 파티션들이 복제하기까지 기다리지 않고 바로 acks 를 보내게 됩니다.

뭔가 예상했던 동작방식과는 다르게 동작합니다. 그럼 min.insync.replicas 옵션을 리더를 포함한 팔로워 파티션들의 개수를 적어주는 것이 가장 좋을까요?? 그렇지 않습니다!!

  • 이유를 예시로 들며 설명하겠습니다.
  • 만약 min.insync.replicas=3이고 replica=3 인 상황이라고 가정해보겠습니다.
  • 여기서 하나의 브로커가 장애가 생겨서 다운됐다면, 메시지를 저장할 때 3개의 replica가 성공적으로 저장이 되어야 acks 메시지를 보낼 수 있게 되는데 브로커 하나가 다운됨으로써 acks 메시지를 보내지 못하게 되는 상황이 발생합니다.
  • 이는 카프카 클러스터 전체로 장애가 이어질 수 있기 때문에 이렇게 설정하면 위험합니다!

acks=all 옵션 자체는 리더와 팔로워 파티션들이 복제하기까지 기다린다는 것을 의미합니다. 하지만, 공식문서에서는 일부 팔로워 파티션만 복제되더라도 메시지 손실의 가능성을 충분히 줄여줄 수 있고, 카프카의 고가용성 특징을 살리기 위해 min.insync.replicas 옵션 값은 2로 주는 것을 권장합니다.

카프카가 디스크 방식으로 동작하고, 복제, ISR 개념들을 활용해 고가용성을 구현했지만 데이터 무손실을 완전히 보장하지는 않습니다. 이전 글에서도 다뤘듯이, 카프카는 디스크로부터 직접 읽기/쓰기 작업이 아닌 운영체제의 페이지 캐시를 이용해서 빠른 읽기/쓰기 작업을 수행하기에 페이지 캐시의 내용이 디스크에 적재되지 못하면 옵션이 acks=all이어도 메시지 유실이 발생할 수 있습니다.


프로듀서 옵션들

  • buffer.memory

    • 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등)할 수 있는 전체 메모리 바이트를 의미합니다.
  • compression.type

    • 프로듀서는 데이터를 압축해서 보낼 수 있는데, 어떤 타입으로 압축할지를 정하는 옵션입니다. none, gzip, snappy, lz4 와 같은 포맷 중 하나를 선택할 수 있습니다.
  • retries

    • 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수를 의미합니다.
  • batch.size

    • 프로듀서는 같은 파티션으로 보내는 여러 데이터는 배치 처리를 함으로써 빠르게 동작합니다. 해당 옵션은 배치 크기 바이트 단위를 조정할 수 있습니다.
    • 배치를 보내기 전 클라이언트 장애가 발생하면 배치 내에 있던 메시지는 전달되지 않습니다. 만약 고가용성이 필요한 메시지의 경우라면 배치 사이즈를 주지 않는 것도 하나의 방법일 수 있습니다.
  • linger.ms

    • 배치 형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정합니다. 카프카 프로듀서는 지정해둔 배치 사이즈(batch.size)에 도달하면 즉시 메시지를 전송하고, 배치 사이즈에 도달하지 못해도 제한시간(linger.ms)에 도달하게 되면 메시지들을 전송합니다.
    • 0이 기본값이지만 0보다 큰 값을 설정하면 지연 시간은 조금 발생하지만 처리량은 높아집니다.
  • max.request.size

    • 프로듀서가 보낼 수 있는 최대 메시지 바이트 사이즈 입니다. 기본값은 1MB입니다.

파티셔너(Partitioner)

카프카 프로듀서에서 카프카로 메시지를 보낼 때, 파티션(키 값으로 지정)을 지정해주지 않으면 메시지는 라운드 로빈 방식으로 파티션에 균등하게 배분됩니다. 이렇게 동작하는 이유는 카프카 서버에 메시지를 보내기 전에 파티셔너가 요청을 받고, 어느 파티션에 보내야할지를 결정하기 때문입니다.

키 값이 없으면, 파티셔너가 라운드로빈 방식으로 파티션에 메시지를 적재합니다. 만약 메시지 키를 가지는 레코드는 파티셔너에 의해서 특정한 해쉬값이 생성되는데, 이 해쉬값을 기준으로 어느 파티션에 들어갈지 결정됩니다.
동일한 메시지 키를 가진 레코드는 동일한 해시값을 생성해내서, 동일한 파티션에 들어가는 것을 보장하기에 순서를 지켜서 처리할 수 있다는 장점을 가집니다.

아래의 그림은 프로듀서가 키 값을 지정하지 않고 메시지를 보내서 파티셔너가 라운드로빈(UniformStickyPartitioner) 로 동작하는 그림을 나타낸다.

아래의 그림은 프로듀서가 키 값을 지정해서 메시지를 보냄으로써 메시지의 키 값을 토대로 해싱 값을 얻어내 특정 파티션에 메시지를 보내는 과정을 나타낸 것이다.

Reference

카프카, 데이터 플랫폼의 최강자
https://sungjk.github.io/2021/01/23/kafka-producer.html
https://velog.io/@ddangle/Kafka%EC%9D%98-%ED%8A%B9%EC%A7%95%EA%B3%BC-%EB%AA%A9%EC%A0%81#%ED%8E%98%EC%9D%B4%EC%A7%80-%EC%BA%90%EC%8B%9C
https://ko.wikipedia.org/wiki/%EC%BD%9C%EB%B0%B1

profile
꾸벅 🙇‍♂️ 매일매일 한발씩 나아가자잇!

0개의 댓글