Apache Kafka - producer

나르·2024년 10월 8일
1

Kafka

목록 보기
2/3

프로듀서 전송 과정

  • ProducerRecord 객체 생성
  • key & value 바이트 배열로 직렬화
  • 파티셔너로 전송
  • 레코드 배치에 데이터 추가
  • 별도의 스레드가 레코드 배치를 브로커에 전송
  • 브로커는 저장에 성공하면 RecordMetadata 객체 리턴
  • RecordMetadata => 토픽 + 파티션 + 파티션의 레코드 오프셋
  • 실패하면 에러 리턴
  • 프로듀서는 에러를 받으면 사용자에게 보내지 않고 재전송 수행

Ref. KafkaProducer Client Internals

프로듀서 생성하기

필수 속성값

  • bootstrap.servers
    • 브로커의 host:port 목록 (ex) 10.x.x.x:9092,10.x.x.x:9092,...
    • 모든 브로커를 포함하지 않아도 상관없다.
    • 두대 이상 지정 권장 (만약 둘 중에 하나가 정지하면 다른 하나로 연결하기 위함)
  • key.serializer, value.serializer
    • 위의 그림에서 Serializer
    • ByteArraySerializer, StringSerializer, IntegerSerializer 등이 있다.
    • String 보낼거면 StringSerializer 사용
    • key 지정하지 않아도 설정 필요하다. 이런 경우 보통은 VoidSerializer 사용

예제

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

메시지 전달하기

//  정한 타입이 일치해야함
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
try {
// send() 메소드는 RecordMetadata를 포함한 자바 Future 객체를 리턴하지만, 여기서는 무시하기 때문에 전송 성공 여부를 알 수 없다.
    producer.send(record);
} catch(Exception e) {
/** 
 전송할 때 혹은 브로커 자체 에러를 무시하더라도 프로듀서가 메시지를 보내기 전 에러가 발생할 경우 예외 발생 가능.
 SerializationException -> 직렬화 실패 경우
 TimeoutException -> 버퍼가 가득 찬 경우
 InterruptException -> 스레드에 인터럽트가 걸리는 경우
**/
    e.printStackTrace();
}

동기적 전송
작업이 몰리면 브로커가 쓰기 요청에 응답하기까지 최소 2ms 최대 몇 초가 지연 될 수 있다.
전송 요청하는 스레드는 위 시간동안 아무 작업도 할 수 없다.
(동기적 전송은 잘 안쓴다.)

ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
try {
// 카프카로 응답이 올 때 까지 대기하기 위해 .get() 사용
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}

비동기적 전송
카프카는 레코드를 쓴 뒤 해당 레코드의 토픽, 파티션 오프셋을 리턴하는데, 대부분의 애플리케이션은 이런 메타데이터가 필요가 없다.
메시지 전송이 실패했을 경우에는 해당 내용을 알아야 하는데, 이때는 콜백을 사용한다.

// org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스 필요.
// 해당 인터페이스에는 onCompletion() 단 하나의 메서드만 정의되어 있다.
private class TestProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
// 만약 에러를 리턴하면 onCompletion() 메서드가 null이 아닌 Exception 객체를 받게된다.
            e.printStackTrace();
        }
    }
}
 
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
/// Callback 객체 매개변수로 전달
producer.send(record, new TestProducerCallback());

KafkaProducer 에러 종류

  • 재시도 가능한 에러 (메시지를 다시 전송 가능한 에러)
    • 연결 에러
    • 네트워크 타임아웃
    • 리더 선출
    • 파티션 리밸런싱 에러
  • 재시도 불가능 에러
    • 직렬화 실패
    • producer 버퍼 풀
    • 메시지 크기가 너무 클 경우

일반 옵션

아래의 설정들은 프로듀서에서만 설정이 가능하다. (브로커에서 설정 불가능)
하기 옵션을 제외한 나머지 옵션은 기본으로 두고 쓰는걸 권장한다.
kafka 2.22 기준

// 옵션 적용 방법
props.put("옵션 이름", "옵션 값");
props.put(ProducerConfig.옵션 이름, "옵션 값");

client.id

  • 프로듀서 구분하기 위한 논리적 식별자
  • 설정하면 프로듀서 모니터링시 구별하기 쉽다.

acks

  • 프로듀서가 쓰기 작업이 성공했다고 판별하기 위한 값
acks = 0acks = 1acks = all
성공 기준전송 후(브로커의 응답을 기다리지 않음)리더 브로커가 받으면모든 복제가 완료
특징처리량이 매우 중요하고, 유실이 되어도 상관없으면 설정리더가 받고 복제 직전에 죽어버리면 메시지가 유실된다.리더가 그 전에 죽으면 재전송.최소 2개 이상의 브로커가 해당 메시지를 가지고 있고, 크래시가 나도 유실되지 않기 때문에 가장 안전하나 응답까지의 시간이 다른 설정보다 오래 걸린다

buffer.memory

  • 프로듀서가 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기
  • 버퍼가 가득 차면 추가로 호출되는 send()는 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생기기를 기다리게 되는데, 해당 시간 동안 대기하고 공간 확보가 안되면 예외 발생

compression.type

  • 기본적으로 압축 비활성
  • snappy, gzip, lz4, zstd 가 있으며, 사용 시 메시지를 압축하여 브로커로 전송
    • Snappy 압축 알고리즘이 CPU 부하도 적고 압축 성능도 적당히 좋다.
    • 만약 높은 압축률이 필요하다면 Gzip 사용
  • 압축 사용하면 CPU 사용률이 올라감

batch.size

  • 같은 파티션에 다수의 레코드가 전송되면 프로듀서는 이를 배치로 모아서 한꺼번에 전송한다.
    각 배치에 사용될 메모리의 양을 결정(바이트 단위임)
  • 배치가 가득 차면 해당 배치의 모든 메시지가 전송된다.
    - 배치가 가득 찰 때까지 기다리는 것은 아니고, 아래 linger.ms 에 설정된 제한 시간이 되면 배치를 전송한다.
  • 너무 작게 설정하면 지나치게 자주 메시지를 전송하기 때문에 약간의 오버헤드 발생 가능

max.in.flight.requests.per.connection

  • 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수
  • 값을 올리면 메모리 사용량이 늘지만 처리량도 증가함
  • 문서에 따르면 2일 때 처리량이 최대를 기록하지만, 기본값인 5를 사용해도 비슷한 성능을 보여준다고 한다.

max.request.size

  • 프로듀서가 전송하는 쓰기 요청이 크기를 결정
  • 메시지 최대 크기를 제한하기도 하지만, 한 번의 요청에 보낼 수 있는 메시지 최대 개수 역시 제한
  • 브로커의 message.max.bytes 설정과 동일하게 설정하는게 좋음.

receive.buffer.bytes, send.buffer.bytes

  • 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정
  • -1일 경우 운영체제의 기본값 사용
  • 프로듀서나 컨슈머가 다른 데이터센터에 위치한 브로커와 통신할 경우 값을 올려주는게 좋다.

enable.idempotence

  • 정확히 한 번 옵션 사용(카프카 버전 0.11부터 가능)
  • 해당 옵션을 활성화 하면 메시지에 번호를 붙여서 발행한다.

메시지 전달 시간 관련 옵션

ProducerRecord를 보낼 때 걸리는 시간이 두 구간으로 나뉘어 있다.

  • send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
    • 이 시간 동안 send()를 호출한 스레드는 블록
  • send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터 콜백이 호출될 때까지 걸리는 시간
    • ProducerRecord가 전송을 위해 배치에 추가된 시점부터 전송을 위해 할당된 시간이 소진될 때까지의 시간과 동일

max.block.ms

  • 프로듀서가 아래의 경우 얼마나 오랫동안 블록되는지 결정
    • send()를 호출했을 때
    • partitionsFor를 호출해서 명시적으로 메타데이터를 요청했을 때
  • 프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록된다.
    • 이 상태에서 max.block.ms만큼 시간이 흐르면 예외가 발생

delivery.timeout.ms

  • 레코드 전송 준비가 완료된 시점(send()가 정상적으로 리턴되고 레코드가 배치에 저장된 시점)부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간을 결정
  • linger.ms, request.timeout.ms보다 커야함.
  • 만약 프로듀서가 재시도를 하는 도중에 delivery.timeout.ms가 넘어가면, 마지막으로 재시도 하기 전에 브로커가 리턴한 예외와 함께 콜백이 호출
  • 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms가 넘어가버리면 타임아웃 예외와 함께 콜백이 호출

request.timeout.ms

  • 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴지 결정.

retries, retry.backoff.ms

  • 프로듀서가 서버로부터 에러 메시지를 받았을 때 retries는 재전송하는 횟수를 결정.
  • 기본적으로 프로듀서는 각 재시도 사이에 100ms 대기한다. 이 값은 retry.backoff.ms 설정을 통해 조정 가능
  • 임의로 브로커 죽이고 다시 돌아오는데 걸리는 시간을 측정하고 delivery.timeout.ms로 잡아주는걸 권장.

linger.ms

  • 현재 배치를 전송하기 전까지 대기하는 시간을 결정
  • producer는 현재 배치가 가득 차거나 linger.ms에 설정된 제한 시간이 되었을 때 메시지 배치를 전송.
  • 기본은 메시지 전송에 사용 가능한 스레드가 있으면 바로 전송함
  • 해당 값을 증가 시키면, 지연이 조금 증가하는 대신 처리율을 크게 증가시킬 수 있다.
  • 단위 메시지당 추가적으로 드는 시간은 매우 작지만 압축이 설정되어 있거나 하면 효율적이다.
profile
💻 + ☕ = </>

0개의 댓글